From d2dde382e6423afd88217128dbc66dde74415dca Mon Sep 17 00:00:00 2001 From: dec05eba Date: Sat, 2 Jun 2018 01:39:25 +0200 Subject: Receive data with epoll_wait --- .kdev4/sibs-pubsub.kdev4 | 17 +++++ README.md | 4 +- include/DirectConnection.hpp | 44 ++++++++++-- include/Log.hpp | 12 ++++ project.conf | 2 +- sibs-pubsub.kdev4 | 4 ++ src/DirectConnection.cpp | 168 ++++++++++++++++++++++++++++++++++--------- src/Log.cpp | 43 +++++++++++ tests/main.cpp | 1 + 9 files changed, 252 insertions(+), 43 deletions(-) create mode 100644 .kdev4/sibs-pubsub.kdev4 create mode 100644 include/Log.hpp create mode 100644 sibs-pubsub.kdev4 create mode 100644 src/Log.cpp diff --git a/.kdev4/sibs-pubsub.kdev4 b/.kdev4/sibs-pubsub.kdev4 new file mode 100644 index 0000000..3f12798 --- /dev/null +++ b/.kdev4/sibs-pubsub.kdev4 @@ -0,0 +1,17 @@ +[Buildset] +BuildItems=@Variant(\x00\x00\x00\t\x00\x00\x00\x00\x01\x00\x00\x00\x0b\x00\x00\x00\x00\x01\x00\x00\x00\x0e\x00u\x00d\x00t\x00-\x00d\x00h\x00t) + +[CustomDefinesAndIncludes][ProjectPath0] +Path=. +parseAmbiguousAsCPP=true +parserArguments=-ferror-limit=100 -fspell-checking -Wdocumentation -Wunused-parameter -Wunreachable-code -Wall -std=c++11 +parserArgumentsC=-ferror-limit=100 -fspell-checking -Wdocumentation -Wunused-parameter -Wunreachable-code -Wall -std=c99 + +[CustomDefinesAndIncludes][ProjectPath0][Compiler] +Name=Clang + +[CustomDefinesAndIncludes][ProjectPath0][Includes] +1=/home/dec05eba/.cache/sibs/lib/udt/4.11/include + +[Project] +VersionControlSupport=kdevgit diff --git a/README.md b/README.md index 5dd2e4b..3dd0713 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ -# udt-dht -DHT using UDT for connection/data transfer +# sibs pubsub +Publish/subscription using UDT for connection/data transfer diff --git a/include/DirectConnection.hpp b/include/DirectConnection.hpp index c11871f..4075d12 100644 --- a/include/DirectConnection.hpp +++ b/include/DirectConnection.hpp @@ -1,6 +1,12 @@ #pragma once #include +#include +#include +#include +#include +#include +#include #include "types.hpp" #include "utils.hpp" @@ -20,6 +26,12 @@ namespace sibs ConnectionException(const std::string &errMsg) : std::runtime_error(errMsg) {} }; + class SendException : public std::runtime_error + { + public: + SendException(const std::string &errMsg) : std::runtime_error(errMsg) {} + }; + class Ipv4 { DISABLE_COPY(Ipv4) @@ -31,19 +43,41 @@ namespace sibs struct addrinfo *address; }; + enum class PubSubConnectResult + { + OK, + ERROR + }; + + using PubSubConnectCallback = std::function; + using PubSubReceiveDataCallback = std::function; + + struct DirectConnectionPeer + { + int socket; + PubSubReceiveDataCallback receiveDataCallbackFunc; + }; + class DirectConnections { DISABLE_COPY(DirectConnections) public: - DirectConnections(); + DirectConnections(u16 port = 27137); ~DirectConnections(); - void connect(const Ipv4 &address); + // Throws ConnectionException on error + std::shared_ptr connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc); + // Throws SendException on error + void send(const std::shared_ptr &peer, const void *data, const usize size); private: - void init(); - void cleanup(); + void receiveData(); + bool receiveDataFromPeer(const int socket, char *output); private: + u16 port; int eid; - int mySocket; + std::unordered_map> peers; + std::thread receiveDataThread; + std::mutex peersMutex; + bool alive; }; } diff --git a/include/Log.hpp b/include/Log.hpp new file mode 100644 index 0000000..1301c5b --- /dev/null +++ b/include/Log.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace sibs +{ + class Log + { + public: + static void debug(const char *fmt, ...); + static void warn(const char *fmt, ...); + static void error(const char *fmt, ...); + }; +} diff --git a/project.conf b/project.conf index 6560f79..3365c0c 100644 --- a/project.conf +++ b/project.conf @@ -1,5 +1,5 @@ [package] -name = "udt-dht" +name = "sibs-pubsub" version = "0.1.0" type = "static" tests = "tests" diff --git a/sibs-pubsub.kdev4 b/sibs-pubsub.kdev4 new file mode 100644 index 0000000..3c65984 --- /dev/null +++ b/sibs-pubsub.kdev4 @@ -0,0 +1,4 @@ +[Project] +CreatedFrom= +Manager=KDevCustomBuildSystem +Name=sibs-pubsub diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 12ae761..3e79658 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -1,4 +1,5 @@ #include "../include/DirectConnection.hpp" +#include "../include/Log.hpp" #include #include @@ -13,6 +14,9 @@ namespace sibs { + // Max received data size allowed when receiving regular data, receive data as file to receive more data + const int MAX_RECEIVED_DATA_SIZE = 1024 * 1024 * 1; // 1Mb + Ipv4::Ipv4(const char *ip, u16 port) { struct addrinfo hints = {}; @@ -37,33 +41,42 @@ namespace sibs freeaddrinfo(address); } - DirectConnections::DirectConnections() : - mySocket(0) + DirectConnections::DirectConnections(u16 _port) : + port(_port), + alive(true) { - try - { - init(); - } - catch(...) - { - cleanup(); - } + UDT::startup(); + eid = UDT::epoll_create(); + receiveDataThread = std::thread(&DirectConnections::receiveData, this); } DirectConnections::~DirectConnections() { - cleanup(); + alive = false; + receiveDataThread.join(); + + for(auto &peer : peers) + { + UDT::close(peer.first); + } + UDT::epoll_release(eid); + UDT::cleanup(); } - void DirectConnections::init() + std::shared_ptr DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) { - UDT::startup(); - eid = UDT::epoll_create(); - mySocket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM); + UDTSOCKET socket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM); + if(socket == UDT::INVALID_SOCK) + { + std::string errMsg = "UDT: Failed to create socket, error: "; + errMsg += UDT::getlasterror_desc(); + throw ConnectionException(errMsg); + } + bool rendezvous = true; - UDT::setsockopt(mySocket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool)); + UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool)); bool reuseAddr = true; - UDT::setsockopt(mySocket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool)); + UDT::setsockopt(socket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool)); // Windows UDP issue // For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold @@ -74,39 +87,124 @@ namespace sibs sockaddr_in myAddr = {}; myAddr.sin_family = AF_INET; - myAddr.sin_port = htons(9000); + myAddr.sin_port = htons(port); myAddr.sin_addr.s_addr = INADDR_ANY; memset(&myAddr.sin_zero, '\0', 8); - if(UDT::bind(mySocket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR) + if(UDT::bind(socket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR) { - // TODO: Add ip and port to error std::string errMsg = "UDT: Failed to bind, error: "; - errMsg += UDT::getlasterror().getErrorMessage(); + errMsg += UDT::getlasterror_desc(); throw ConnectionException(errMsg); } + + if(UDT::connect(socket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR) + { + std::string errMsg = "UDT: Failed to connect, error: "; + errMsg += UDT::getlasterror_desc(); + throw ConnectionException(errMsg); + } + + UDT::epoll_add_usock(eid, socket); + std::shared_ptr peer = std::make_shared(); + peer->socket = socket; + peer->receiveDataCallbackFunc = receiveDataCallbackFunc; + peersMutex.lock(); + peers[socket] = peer; + peersMutex.unlock(); + return peer; } - void DirectConnections::cleanup() + void DirectConnections::send(const std::shared_ptr &peer, const void *data, const usize size) { - UDT::epoll_release(eid); - - if(mySocket != 0) - UDT::close(mySocket); + usize sentSizeTotal = 0; + while(sentSizeTotal < size) + { + int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0); + if(sentSize == UDT::ERROR) + { + std::string errMsg = "UDT: Failed to send data, error: "; + errMsg += UDT::getlasterror_desc(); + throw SendException(errMsg); + } + sentSizeTotal += sentSize; + } + } + + void DirectConnections::receiveData() + { + std::vector data; + data.reserve(MAX_RECEIVED_DATA_SIZE); - UDT::cleanup(); + std::set readfds; + while(alive) + { + int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 250); + if(numfsReady == 0) + continue; + else if(numfsReady == -1) + { + if(UDT::getlasterror_code() == UDT::ERRORINFO::ETIMEOUT) + continue; + else + { + Log::error("UDT: Stop receiving data, got error: %s", UDT::getlasterror_desc()); + return; + } + } + + for(UDTSOCKET receivedDataFromPeer : readfds) + { + bool receivedData = receiveDataFromPeer(receivedDataFromPeer, data.data()); + if(receivedData) + { + peersMutex.lock(); + auto peer = peers[receivedDataFromPeer]; + peersMutex.unlock(); + try + { + if(peer->receiveDataCallbackFunc) + peer->receiveDataCallbackFunc(data.data(), data.size()); + } + catch(std::exception &e) + { + Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what()); + } + } + } + readfds.clear(); + } } - void DirectConnections::connect(const Ipv4 &address) + bool DirectConnections::receiveDataFromPeer(const int socket, char *output) { - if(UDT::connect(mySocket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR) + usize receivedTotalSize = 0; + while(receivedTotalSize < MAX_RECEIVED_DATA_SIZE) { - // TODO: Add ip and port to error - std::string errMsg = "UDT: Failed to connect, error: "; - errMsg += UDT::getlasterror().getErrorMessage(); - throw ConnectionException(errMsg); + usize dataAvailableSize; + int receiveSizeDataTypeSize = sizeof(dataAvailableSize); + if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) + { + Log::error("UDT: Failed to receive data available size, error: %s", UDT::getlasterror_desc()); + return false; + } + + int receivedSize = UDT::recv(socket, &output[receivedTotalSize], MAX_RECEIVED_DATA_SIZE - receivedTotalSize, 0); + if(receivedSize == UDT::ERROR) + { + Log::error("UDT: Failed to receive data, error: %s", UDT::getlasterror_desc()); + return false; + } + + if(receivedSize == 0) + { + return true; + } + + receivedTotalSize += dataAvailableSize; } - //UDT::epoll_add_usock(eid, 2); - } + Log::error("UDT: Received too much data, ignoring..."); + return false; + } } diff --git a/src/Log.cpp b/src/Log.cpp new file mode 100644 index 0000000..df424f0 --- /dev/null +++ b/src/Log.cpp @@ -0,0 +1,43 @@ +#include "../include/Log.hpp" +#include +#include +#include + +namespace sibs +{ + // TODO: Disable color if stdout/stderr is not tty + static std::mutex mutexLog; + + void Log::debug(const char *fmt, ...) + { + std::lock_guard lock(mutexLog); + va_list args; + va_start(args, fmt); + fputs("\033[1;32mDebug:\033[0m ", stdout); + vfprintf(stdout, fmt, args); + fputs("\n", stdout); + va_end(args); + } + + void Log::warn(const char *fmt, ...) + { + std::lock_guard lock(mutexLog); + va_list args; + va_start(args, fmt); + fputs("\033[1;33mWarning:\033[0m ", stdout); + vfprintf(stdout, fmt, args); + fputs("\n", stdout); + va_end(args); + } + + void Log::error(const char *fmt, ...) + { + std::lock_guard lock(mutexLog); + va_list args; + va_start(args, fmt); + fputs("\033[1;31mError:\033[0m ", stderr); + vfprintf(stderr, fmt, args); + fputs("\n", stderr); + va_end(args); + } +} diff --git a/tests/main.cpp b/tests/main.cpp index 63e50c3..6b9a36f 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -2,5 +2,6 @@ int main() { + sibs::DirectConnections user1(27137); return 0; } -- cgit v1.2.3