aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-06-02 01:39:25 +0200
committerdec05eba <dec05eba@protonmail.com>2020-08-18 22:56:48 +0200
commitd2dde382e6423afd88217128dbc66dde74415dca (patch)
treefc6259687ccf451171a27566ce0c9f40cbbb9048 /src
parent441cf81acf9dff087addfa8d01a61e4513b1dd6e (diff)
Receive data with epoll_wait
Diffstat (limited to 'src')
-rw-r--r--src/DirectConnection.cpp168
-rw-r--r--src/Log.cpp43
2 files changed, 176 insertions, 35 deletions
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 12ae761..3e79658 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -1,4 +1,5 @@
#include "../include/DirectConnection.hpp"
+#include "../include/Log.hpp"
#include <cstdio>
#include <cstring>
@@ -13,6 +14,9 @@
namespace sibs
{
+ // Max received data size allowed when receiving regular data, receive data as file to receive more data
+ const int MAX_RECEIVED_DATA_SIZE = 1024 * 1024 * 1; // 1Mb
+
Ipv4::Ipv4(const char *ip, u16 port)
{
struct addrinfo hints = {};
@@ -37,33 +41,42 @@ namespace sibs
freeaddrinfo(address);
}
- DirectConnections::DirectConnections() :
- mySocket(0)
+ DirectConnections::DirectConnections(u16 _port) :
+ port(_port),
+ alive(true)
{
- try
- {
- init();
- }
- catch(...)
- {
- cleanup();
- }
+ UDT::startup();
+ eid = UDT::epoll_create();
+ receiveDataThread = std::thread(&DirectConnections::receiveData, this);
}
DirectConnections::~DirectConnections()
{
- cleanup();
+ alive = false;
+ receiveDataThread.join();
+
+ for(auto &peer : peers)
+ {
+ UDT::close(peer.first);
+ }
+ UDT::epoll_release(eid);
+ UDT::cleanup();
}
- void DirectConnections::init()
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- UDT::startup();
- eid = UDT::epoll_create();
- mySocket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM);
+ UDTSOCKET socket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM);
+ if(socket == UDT::INVALID_SOCK)
+ {
+ std::string errMsg = "UDT: Failed to create socket, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw ConnectionException(errMsg);
+ }
+
bool rendezvous = true;
- UDT::setsockopt(mySocket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
+ UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
bool reuseAddr = true;
- UDT::setsockopt(mySocket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool));
+ UDT::setsockopt(socket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool));
// Windows UDP issue
// For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold
@@ -74,39 +87,124 @@ namespace sibs
sockaddr_in myAddr = {};
myAddr.sin_family = AF_INET;
- myAddr.sin_port = htons(9000);
+ myAddr.sin_port = htons(port);
myAddr.sin_addr.s_addr = INADDR_ANY;
memset(&myAddr.sin_zero, '\0', 8);
- if(UDT::bind(mySocket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR)
+ if(UDT::bind(socket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR)
{
- // TODO: Add ip and port to error
std::string errMsg = "UDT: Failed to bind, error: ";
- errMsg += UDT::getlasterror().getErrorMessage();
+ errMsg += UDT::getlasterror_desc();
throw ConnectionException(errMsg);
}
+
+ if(UDT::connect(socket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR)
+ {
+ std::string errMsg = "UDT: Failed to connect, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw ConnectionException(errMsg);
+ }
+
+ UDT::epoll_add_usock(eid, socket);
+ std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
+ peer->socket = socket;
+ peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
+ peersMutex.lock();
+ peers[socket] = peer;
+ peersMutex.unlock();
+ return peer;
}
- void DirectConnections::cleanup()
+ void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size)
{
- UDT::epoll_release(eid);
-
- if(mySocket != 0)
- UDT::close(mySocket);
+ usize sentSizeTotal = 0;
+ while(sentSizeTotal < size)
+ {
+ int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0);
+ if(sentSize == UDT::ERROR)
+ {
+ std::string errMsg = "UDT: Failed to send data, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw SendException(errMsg);
+ }
+ sentSizeTotal += sentSize;
+ }
+ }
+
+ void DirectConnections::receiveData()
+ {
+ std::vector<char> data;
+ data.reserve(MAX_RECEIVED_DATA_SIZE);
- UDT::cleanup();
+ std::set<UDTSOCKET> readfds;
+ while(alive)
+ {
+ int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 250);
+ if(numfsReady == 0)
+ continue;
+ else if(numfsReady == -1)
+ {
+ if(UDT::getlasterror_code() == UDT::ERRORINFO::ETIMEOUT)
+ continue;
+ else
+ {
+ Log::error("UDT: Stop receiving data, got error: %s", UDT::getlasterror_desc());
+ return;
+ }
+ }
+
+ for(UDTSOCKET receivedDataFromPeer : readfds)
+ {
+ bool receivedData = receiveDataFromPeer(receivedDataFromPeer, data.data());
+ if(receivedData)
+ {
+ peersMutex.lock();
+ auto peer = peers[receivedDataFromPeer];
+ peersMutex.unlock();
+ try
+ {
+ if(peer->receiveDataCallbackFunc)
+ peer->receiveDataCallbackFunc(data.data(), data.size());
+ }
+ catch(std::exception &e)
+ {
+ Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what());
+ }
+ }
+ }
+ readfds.clear();
+ }
}
- void DirectConnections::connect(const Ipv4 &address)
+ bool DirectConnections::receiveDataFromPeer(const int socket, char *output)
{
- if(UDT::connect(mySocket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR)
+ usize receivedTotalSize = 0;
+ while(receivedTotalSize < MAX_RECEIVED_DATA_SIZE)
{
- // TODO: Add ip and port to error
- std::string errMsg = "UDT: Failed to connect, error: ";
- errMsg += UDT::getlasterror().getErrorMessage();
- throw ConnectionException(errMsg);
+ usize dataAvailableSize;
+ int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
+ if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
+ {
+ Log::error("UDT: Failed to receive data available size, error: %s", UDT::getlasterror_desc());
+ return false;
+ }
+
+ int receivedSize = UDT::recv(socket, &output[receivedTotalSize], MAX_RECEIVED_DATA_SIZE - receivedTotalSize, 0);
+ if(receivedSize == UDT::ERROR)
+ {
+ Log::error("UDT: Failed to receive data, error: %s", UDT::getlasterror_desc());
+ return false;
+ }
+
+ if(receivedSize == 0)
+ {
+ return true;
+ }
+
+ receivedTotalSize += dataAvailableSize;
}
- //UDT::epoll_add_usock(eid, 2);
- }
+ Log::error("UDT: Received too much data, ignoring...");
+ return false;
+ }
}
diff --git a/src/Log.cpp b/src/Log.cpp
new file mode 100644
index 0000000..df424f0
--- /dev/null
+++ b/src/Log.cpp
@@ -0,0 +1,43 @@
+#include "../include/Log.hpp"
+#include <cstdarg>
+#include <cstdio>
+#include <mutex>
+
+namespace sibs
+{
+ // TODO: Disable color if stdout/stderr is not tty
+ static std::mutex mutexLog;
+
+ void Log::debug(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexLog);
+ va_list args;
+ va_start(args, fmt);
+ fputs("\033[1;32mDebug:\033[0m ", stdout);
+ vfprintf(stdout, fmt, args);
+ fputs("\n", stdout);
+ va_end(args);
+ }
+
+ void Log::warn(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexLog);
+ va_list args;
+ va_start(args, fmt);
+ fputs("\033[1;33mWarning:\033[0m ", stdout);
+ vfprintf(stdout, fmt, args);
+ fputs("\n", stdout);
+ va_end(args);
+ }
+
+ void Log::error(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexLog);
+ va_list args;
+ va_start(args, fmt);
+ fputs("\033[1;31mError:\033[0m ", stderr);
+ vfprintf(stderr, fmt, args);
+ fputs("\n", stderr);
+ va_end(args);
+ }
+}