aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-06-02 01:39:25 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-06-02 01:39:29 +0200
commitad7cc4dbe4dacaa5cccfd1c9adeccc0ece2299f4 (patch)
treeb045f3bc6068fb797203953d661bad5fcc9c509f
parent595b1b7d2fc7aa084296978b8a38a26ed960a0f5 (diff)
Receive data with epoll_wait
-rw-r--r--.kdev4/sibs-pubsub.kdev417
-rw-r--r--README.md4
-rw-r--r--include/DirectConnection.hpp44
-rw-r--r--include/Log.hpp12
-rw-r--r--project.conf2
-rw-r--r--sibs-pubsub.kdev44
-rw-r--r--src/DirectConnection.cpp168
-rw-r--r--src/Log.cpp43
-rw-r--r--tests/main.cpp1
9 files changed, 252 insertions, 43 deletions
diff --git a/.kdev4/sibs-pubsub.kdev4 b/.kdev4/sibs-pubsub.kdev4
new file mode 100644
index 0000000..3f12798
--- /dev/null
+++ b/.kdev4/sibs-pubsub.kdev4
@@ -0,0 +1,17 @@
+[Buildset]
+BuildItems=@Variant(\x00\x00\x00\t\x00\x00\x00\x00\x01\x00\x00\x00\x0b\x00\x00\x00\x00\x01\x00\x00\x00\x0e\x00u\x00d\x00t\x00-\x00d\x00h\x00t)
+
+[CustomDefinesAndIncludes][ProjectPath0]
+Path=.
+parseAmbiguousAsCPP=true
+parserArguments=-ferror-limit=100 -fspell-checking -Wdocumentation -Wunused-parameter -Wunreachable-code -Wall -std=c++11
+parserArgumentsC=-ferror-limit=100 -fspell-checking -Wdocumentation -Wunused-parameter -Wunreachable-code -Wall -std=c99
+
+[CustomDefinesAndIncludes][ProjectPath0][Compiler]
+Name=Clang
+
+[CustomDefinesAndIncludes][ProjectPath0][Includes]
+1=/home/dec05eba/.cache/sibs/lib/udt/4.11/include
+
+[Project]
+VersionControlSupport=kdevgit
diff --git a/README.md b/README.md
index 5dd2e4b..3dd0713 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,2 @@
-# udt-dht
-DHT using UDT for connection/data transfer
+# sibs pubsub
+Publish/subscription using UDT for connection/data transfer
diff --git a/include/DirectConnection.hpp b/include/DirectConnection.hpp
index c11871f..4075d12 100644
--- a/include/DirectConnection.hpp
+++ b/include/DirectConnection.hpp
@@ -1,6 +1,12 @@
#pragma once
#include <stdexcept>
+#include <unordered_map>
+#include <functional>
+#include <string>
+#include <memory>
+#include <thread>
+#include <mutex>
#include "types.hpp"
#include "utils.hpp"
@@ -20,6 +26,12 @@ namespace sibs
ConnectionException(const std::string &errMsg) : std::runtime_error(errMsg) {}
};
+ class SendException : public std::runtime_error
+ {
+ public:
+ SendException(const std::string &errMsg) : std::runtime_error(errMsg) {}
+ };
+
class Ipv4
{
DISABLE_COPY(Ipv4)
@@ -31,19 +43,41 @@ namespace sibs
struct addrinfo *address;
};
+ enum class PubSubConnectResult
+ {
+ OK,
+ ERROR
+ };
+
+ using PubSubConnectCallback = std::function<void(PubSubConnectResult result, const std::string &resultStr)>;
+ using PubSubReceiveDataCallback = std::function<void(const void *data, const usize size)>;
+
+ struct DirectConnectionPeer
+ {
+ int socket;
+ PubSubReceiveDataCallback receiveDataCallbackFunc;
+ };
+
class DirectConnections
{
DISABLE_COPY(DirectConnections)
public:
- DirectConnections();
+ DirectConnections(u16 port = 27137);
~DirectConnections();
- void connect(const Ipv4 &address);
+ // Throws ConnectionException on error
+ std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc);
+ // Throws SendException on error
+ void send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size);
private:
- void init();
- void cleanup();
+ void receiveData();
+ bool receiveDataFromPeer(const int socket, char *output);
private:
+ u16 port;
int eid;
- int mySocket;
+ std::unordered_map<int, std::shared_ptr<DirectConnectionPeer>> peers;
+ std::thread receiveDataThread;
+ std::mutex peersMutex;
+ bool alive;
};
}
diff --git a/include/Log.hpp b/include/Log.hpp
new file mode 100644
index 0000000..1301c5b
--- /dev/null
+++ b/include/Log.hpp
@@ -0,0 +1,12 @@
+#pragma once
+
+namespace sibs
+{
+ class Log
+ {
+ public:
+ static void debug(const char *fmt, ...);
+ static void warn(const char *fmt, ...);
+ static void error(const char *fmt, ...);
+ };
+}
diff --git a/project.conf b/project.conf
index 6560f79..3365c0c 100644
--- a/project.conf
+++ b/project.conf
@@ -1,5 +1,5 @@
[package]
-name = "udt-dht"
+name = "sibs-pubsub"
version = "0.1.0"
type = "static"
tests = "tests"
diff --git a/sibs-pubsub.kdev4 b/sibs-pubsub.kdev4
new file mode 100644
index 0000000..3c65984
--- /dev/null
+++ b/sibs-pubsub.kdev4
@@ -0,0 +1,4 @@
+[Project]
+CreatedFrom=
+Manager=KDevCustomBuildSystem
+Name=sibs-pubsub
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 12ae761..3e79658 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -1,4 +1,5 @@
#include "../include/DirectConnection.hpp"
+#include "../include/Log.hpp"
#include <cstdio>
#include <cstring>
@@ -13,6 +14,9 @@
namespace sibs
{
+ // Max received data size allowed when receiving regular data, receive data as file to receive more data
+ const int MAX_RECEIVED_DATA_SIZE = 1024 * 1024 * 1; // 1Mb
+
Ipv4::Ipv4(const char *ip, u16 port)
{
struct addrinfo hints = {};
@@ -37,33 +41,42 @@ namespace sibs
freeaddrinfo(address);
}
- DirectConnections::DirectConnections() :
- mySocket(0)
+ DirectConnections::DirectConnections(u16 _port) :
+ port(_port),
+ alive(true)
{
- try
- {
- init();
- }
- catch(...)
- {
- cleanup();
- }
+ UDT::startup();
+ eid = UDT::epoll_create();
+ receiveDataThread = std::thread(&DirectConnections::receiveData, this);
}
DirectConnections::~DirectConnections()
{
- cleanup();
+ alive = false;
+ receiveDataThread.join();
+
+ for(auto &peer : peers)
+ {
+ UDT::close(peer.first);
+ }
+ UDT::epoll_release(eid);
+ UDT::cleanup();
}
- void DirectConnections::init()
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- UDT::startup();
- eid = UDT::epoll_create();
- mySocket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM);
+ UDTSOCKET socket = UDT::socket(AI_PASSIVE, AF_INET, SOCK_STREAM);
+ if(socket == UDT::INVALID_SOCK)
+ {
+ std::string errMsg = "UDT: Failed to create socket, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw ConnectionException(errMsg);
+ }
+
bool rendezvous = true;
- UDT::setsockopt(mySocket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
+ UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
bool reuseAddr = true;
- UDT::setsockopt(mySocket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool));
+ UDT::setsockopt(socket, 0, UDT_REUSEADDR, &reuseAddr, sizeof(bool));
// Windows UDP issue
// For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold
@@ -74,39 +87,124 @@ namespace sibs
sockaddr_in myAddr = {};
myAddr.sin_family = AF_INET;
- myAddr.sin_port = htons(9000);
+ myAddr.sin_port = htons(port);
myAddr.sin_addr.s_addr = INADDR_ANY;
memset(&myAddr.sin_zero, '\0', 8);
- if(UDT::bind(mySocket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR)
+ if(UDT::bind(socket, (sockaddr*)&myAddr, sizeof(myAddr)) == UDT::ERROR)
{
- // TODO: Add ip and port to error
std::string errMsg = "UDT: Failed to bind, error: ";
- errMsg += UDT::getlasterror().getErrorMessage();
+ errMsg += UDT::getlasterror_desc();
throw ConnectionException(errMsg);
}
+
+ if(UDT::connect(socket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR)
+ {
+ std::string errMsg = "UDT: Failed to connect, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw ConnectionException(errMsg);
+ }
+
+ UDT::epoll_add_usock(eid, socket);
+ std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
+ peer->socket = socket;
+ peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
+ peersMutex.lock();
+ peers[socket] = peer;
+ peersMutex.unlock();
+ return peer;
}
- void DirectConnections::cleanup()
+ void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size)
{
- UDT::epoll_release(eid);
-
- if(mySocket != 0)
- UDT::close(mySocket);
+ usize sentSizeTotal = 0;
+ while(sentSizeTotal < size)
+ {
+ int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0);
+ if(sentSize == UDT::ERROR)
+ {
+ std::string errMsg = "UDT: Failed to send data, error: ";
+ errMsg += UDT::getlasterror_desc();
+ throw SendException(errMsg);
+ }
+ sentSizeTotal += sentSize;
+ }
+ }
+
+ void DirectConnections::receiveData()
+ {
+ std::vector<char> data;
+ data.reserve(MAX_RECEIVED_DATA_SIZE);
- UDT::cleanup();
+ std::set<UDTSOCKET> readfds;
+ while(alive)
+ {
+ int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 250);
+ if(numfsReady == 0)
+ continue;
+ else if(numfsReady == -1)
+ {
+ if(UDT::getlasterror_code() == UDT::ERRORINFO::ETIMEOUT)
+ continue;
+ else
+ {
+ Log::error("UDT: Stop receiving data, got error: %s", UDT::getlasterror_desc());
+ return;
+ }
+ }
+
+ for(UDTSOCKET receivedDataFromPeer : readfds)
+ {
+ bool receivedData = receiveDataFromPeer(receivedDataFromPeer, data.data());
+ if(receivedData)
+ {
+ peersMutex.lock();
+ auto peer = peers[receivedDataFromPeer];
+ peersMutex.unlock();
+ try
+ {
+ if(peer->receiveDataCallbackFunc)
+ peer->receiveDataCallbackFunc(data.data(), data.size());
+ }
+ catch(std::exception &e)
+ {
+ Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what());
+ }
+ }
+ }
+ readfds.clear();
+ }
}
- void DirectConnections::connect(const Ipv4 &address)
+ bool DirectConnections::receiveDataFromPeer(const int socket, char *output)
{
- if(UDT::connect(mySocket, address.address->ai_addr, address.address->ai_addrlen) == UDT::ERROR)
+ usize receivedTotalSize = 0;
+ while(receivedTotalSize < MAX_RECEIVED_DATA_SIZE)
{
- // TODO: Add ip and port to error
- std::string errMsg = "UDT: Failed to connect, error: ";
- errMsg += UDT::getlasterror().getErrorMessage();
- throw ConnectionException(errMsg);
+ usize dataAvailableSize;
+ int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
+ if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
+ {
+ Log::error("UDT: Failed to receive data available size, error: %s", UDT::getlasterror_desc());
+ return false;
+ }
+
+ int receivedSize = UDT::recv(socket, &output[receivedTotalSize], MAX_RECEIVED_DATA_SIZE - receivedTotalSize, 0);
+ if(receivedSize == UDT::ERROR)
+ {
+ Log::error("UDT: Failed to receive data, error: %s", UDT::getlasterror_desc());
+ return false;
+ }
+
+ if(receivedSize == 0)
+ {
+ return true;
+ }
+
+ receivedTotalSize += dataAvailableSize;
}
- //UDT::epoll_add_usock(eid, 2);
- }
+ Log::error("UDT: Received too much data, ignoring...");
+ return false;
+ }
}
diff --git a/src/Log.cpp b/src/Log.cpp
new file mode 100644
index 0000000..df424f0
--- /dev/null
+++ b/src/Log.cpp
@@ -0,0 +1,43 @@
+#include "../include/Log.hpp"
+#include <cstdarg>
+#include <cstdio>
+#include <mutex>
+
+namespace sibs
+{
+ // TODO: Disable color if stdout/stderr is not tty
+ static std::mutex mutexLog;
+
+ void Log::debug(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexLog);
+ va_list args;
+ va_start(args, fmt);
+ fputs("\033[1;32mDebug:\033[0m ", stdout);
+ vfprintf(stdout, fmt, args);
+ fputs("\n", stdout);
+ va_end(args);
+ }
+
+ void Log::warn(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexLog);
+ va_list args;
+ va_start(args, fmt);
+ fputs("\033[1;33mWarning:\033[0m ", stdout);
+ vfprintf(stdout, fmt, args);
+ fputs("\n", stdout);
+ va_end(args);
+ }
+
+ void Log::error(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexLog);
+ va_list args;
+ va_start(args, fmt);
+ fputs("\033[1;31mError:\033[0m ", stderr);
+ vfprintf(stderr, fmt, args);
+ fputs("\n", stderr);
+ va_end(args);
+ }
+}
diff --git a/tests/main.cpp b/tests/main.cpp
index 63e50c3..6b9a36f 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -2,5 +2,6 @@
int main()
{
+ sibs::DirectConnections user1(27137);
return 0;
}