From d2dde382e6423afd88217128dbc66dde74415dca Mon Sep 17 00:00:00 2001 From: dec05eba Date: Sat, 2 Jun 2018 01:39:25 +0200 Subject: Receive data with epoll_wait --- src/DirectConnection.cpp | 168 +++++++++++++++++++++++++++++++++++++---------- src/Log.cpp | 43 ++++++++++++ 2 files changed, 176 insertions(+), 35 deletions(-) create mode 100644 src/Log.cpp (limited to 'src') diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 12ae761..3e79658 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -1,4 +1,5 @@ #include "../include/DirectConnection.hpp" +#include "../include/Log.hpp" #include #include @@ -13,6 +14,9 @@ namespace sibs { + // Max received data size allowed when receiving regular data, receive data as file to receive more data + const int MAX_RECEIVED_DATA_SIZE = 1024 * 1024 * 1; // 1Mb + Ipv4::Ipv4(const char *ip, u16 port) { struct addrinfo hints = {}; @@ -37,33 +41,42 @@ namespace sibs freeaddrinfo(address); } - DirectConnections::DirectConnections() : - mySocket(0) + DirectConnections::DirectConnections(u16 _port) : + port(_port), + alive(true) { - try - { - init(); - } - catch(...) - { - cleanup(); - } + UDT::startup(); + eid = UDT::epoll_create(); + receiveDataThread = std::thread(&DirectConnections::receiveData, this); } DirectConnections::~DirectConnections() { - cleanup(); + alive = false; + receiveDataThread.join(); + + for(auto &peer : peers) + { + UDT::close(peer.first); + } + UDT::epoll_release(eid); + UDT::cleanup(); } - void DirectConnections::init() + std::shared_ptr DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) { - UDT::startup(); - eid = UDT::epoll_create(); - mySocket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM); + UDTSOCKET socket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM); + if(socket == UDT::INVALID_SOCK) + { + std::string errMsg = "UDT: Failed to create socket, error: "; + errMsg += UDT::getlasterror_desc(); + throw ConnectionException(errMsg); + } + bool rendezvous = true; - UDT::setsockopt(mySocket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool)); + UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool)); bool reuseAddr = true; - UDT::setsockopt(mySocket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool)); + UDT::setsockopt(socket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool)); // Windows UDP issue // For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold @@ -74,39 +87,124 @@ namespace sibs sockaddr_in myAddr = {}; myAddr.sin_family = AF_INET; - myAddr.sin_port = htons(9000); + myAddr.sin_port = htons(port); myAddr.sin_addr.s_addr = INADDR_ANY; memset(&myAddr.sin_zero, '\0', 8); - if(UDT::bind(mySocket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR) + if(UDT::bind(socket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR) { - // TODO: Add ip and port to error std::string errMsg = "UDT: Failed to bind, error: "; - errMsg += UDT::getlasterror().getErrorMessage(); + errMsg += UDT::getlasterror_desc(); throw ConnectionException(errMsg); } + + if(UDT::connect(socket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR) + { + std::string errMsg = "UDT: Failed to connect, error: "; + errMsg += UDT::getlasterror_desc(); + throw ConnectionException(errMsg); + } + + UDT::epoll_add_usock(eid, socket); + std::shared_ptr peer = std::make_shared(); + peer->socket = socket; + peer->receiveDataCallbackFunc = receiveDataCallbackFunc; + peersMutex.lock(); + peers[socket] = peer; + peersMutex.unlock(); + return peer; } - void DirectConnections::cleanup() + void DirectConnections::send(const std::shared_ptr &peer, const void *data, const usize size) { - UDT::epoll_release(eid); - - if(mySocket != 0) - UDT::close(mySocket); + usize sentSizeTotal = 0; + while(sentSizeTotal < size) + { + int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0); + if(sentSize == UDT::ERROR) + { + std::string errMsg = "UDT: Failed to send data, error: "; + errMsg += UDT::getlasterror_desc(); + throw SendException(errMsg); + } + sentSizeTotal += sentSize; + } + } + + void DirectConnections::receiveData() + { + std::vector data; + data.reserve(MAX_RECEIVED_DATA_SIZE); - UDT::cleanup(); + std::set readfds; + while(alive) + { + int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 250); + if(numfsReady == 0) + continue; + else if(numfsReady == -1) + { + if(UDT::getlasterror_code() == UDT::ERRORINFO::ETIMEOUT) + continue; + else + { + Log::error("UDT: Stop receiving data, got error: %s", UDT::getlasterror_desc()); + return; + } + } + + for(UDTSOCKET receivedDataFromPeer : readfds) + { + bool receivedData = receiveDataFromPeer(receivedDataFromPeer, data.data()); + if(receivedData) + { + peersMutex.lock(); + auto peer = peers[receivedDataFromPeer]; + peersMutex.unlock(); + try + { + if(peer->receiveDataCallbackFunc) + peer->receiveDataCallbackFunc(data.data(), data.size()); + } + catch(std::exception &e) + { + Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what()); + } + } + } + readfds.clear(); + } } - void DirectConnections::connect(const Ipv4 &address) + bool DirectConnections::receiveDataFromPeer(const int socket, char *output) { - if(UDT::connect(mySocket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR) + usize receivedTotalSize = 0; + while(receivedTotalSize < MAX_RECEIVED_DATA_SIZE) { - // TODO: Add ip and port to error - std::string errMsg = "UDT: Failed to connect, error: "; - errMsg += UDT::getlasterror().getErrorMessage(); - throw ConnectionException(errMsg); + usize dataAvailableSize; + int receiveSizeDataTypeSize = sizeof(dataAvailableSize); + if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) + { + Log::error("UDT: Failed to receive data available size, error: %s", UDT::getlasterror_desc()); + return false; + } + + int receivedSize = UDT::recv(socket, &output[receivedTotalSize], MAX_RECEIVED_DATA_SIZE - receivedTotalSize, 0); + if(receivedSize == UDT::ERROR) + { + Log::error("UDT: Failed to receive data, error: %s", UDT::getlasterror_desc()); + return false; + } + + if(receivedSize == 0) + { + return true; + } + + receivedTotalSize += dataAvailableSize; } - //UDT::epoll_add_usock(eid, 2); - } + Log::error("UDT: Received too much data, ignoring..."); + return false; + } } diff --git a/src/Log.cpp b/src/Log.cpp new file mode 100644 index 0000000..df424f0 --- /dev/null +++ b/src/Log.cpp @@ -0,0 +1,43 @@ +#include "../include/Log.hpp" +#include +#include +#include + +namespace sibs +{ + // TODO: Disable color if stdout/stderr is not tty + static std::mutex mutexLog; + + void Log::debug(const char *fmt, ...) + { + std::lock_guard lock(mutexLog); + va_list args; + va_start(args, fmt); + fputs("\033[1;32mDebug:\033[0m ", stdout); + vfprintf(stdout, fmt, args); + fputs("\n", stdout); + va_end(args); + } + + void Log::warn(const char *fmt, ...) + { + std::lock_guard lock(mutexLog); + va_list args; + va_start(args, fmt); + fputs("\033[1;33mWarning:\033[0m ", stdout); + vfprintf(stdout, fmt, args); + fputs("\n", stdout); + va_end(args); + } + + void Log::error(const char *fmt, ...) + { + std::lock_guard lock(mutexLog); + va_list args; + va_start(args, fmt); + fputs("\033[1;31mError:\033[0m ", stderr); + vfprintf(stderr, fmt, args); + fputs("\n", stderr); + va_end(args); + } +} -- cgit v1.2.3