diff options
-rw-r--r-- | include/sibs/BootstrapConnection.hpp | 33 | ||||
-rw-r--r-- | include/sibs/BootstrapNode.hpp | 4 | ||||
-rw-r--r-- | include/sibs/DirectConnection.hpp | 24 | ||||
-rw-r--r-- | include/sibs/IpAddress.hpp | 5 | ||||
-rw-r--r-- | include/sibs/PubsubKey.hpp | 42 | ||||
-rw-r--r-- | project.conf | 1 | ||||
-rw-r--r-- | src/BootstrapConnection.cpp | 59 | ||||
-rw-r--r-- | src/BootstrapNode.cpp | 39 | ||||
-rw-r--r-- | src/DirectConnection.cpp | 39 | ||||
-rw-r--r-- | src/IpAddress.cpp | 16 | ||||
-rw-r--r-- | src/PubsubKey.cpp | 27 |
11 files changed, 262 insertions, 27 deletions
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp new file mode 100644 index 0000000..964d777 --- /dev/null +++ b/include/sibs/BootstrapConnection.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include "../utils.hpp" +#include "DirectConnection.hpp" +#include "PubsubKey.hpp" + +namespace sibs +{ + class PubsubKeyAlreadyListeningException : public std::runtime_error + { + public: + PubsubKeyAlreadyListeningException(const std::string &errMsg) : std::runtime_error(errMsg) {} + }; + + using BoostrapConnectionListenCallbackFunc = std::function<void(const void *data, const usize size)>; + + class BootstrapConnection + { + DISABLE_COPY(BootstrapConnection) + public: + BootstrapConnection(const Ipv4 &bootstrapAddress); + + // Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey + void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); + private: + void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size); + void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size); + private: + DirectConnections connections; + std::shared_ptr<DirectConnectionPeer> serverPeer; + PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs; + }; +} diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp index eaf48eb..13a62e1 100644 --- a/include/sibs/BootstrapNode.hpp +++ b/include/sibs/BootstrapNode.hpp @@ -2,6 +2,8 @@ #include "DirectConnection.hpp" #include "IpAddress.hpp" +#include "PubsubKey.hpp" +#include <vector> namespace sibs { @@ -20,9 +22,11 @@ namespace sibs ~BootstrapNode(); private: void acceptConnections(); + void peerSubscribe(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size); private: DirectConnections connections; int socket; std::thread acceptConnectionsThread; + PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers; }; } diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index c4431a5..0e8b961 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -7,6 +7,7 @@ #include <memory> #include <thread> #include <mutex> +#include <vector> #include "IpAddress.hpp" #include "../types.hpp" #include "../utils.hpp" @@ -19,25 +20,22 @@ namespace sibs ConnectionException(const std::string &errMsg) : std::runtime_error(errMsg) {} }; - class SendException : public std::runtime_error - { - public: - SendException(const std::string &errMsg) : std::runtime_error(errMsg) {} - }; - - - enum class PubSubConnectResult + enum class PubSubResult { OK, ERROR }; - using PubSubConnectCallback = std::function<void(PubSubConnectResult result, const std::string &resultStr)>; - using PubSubReceiveDataCallback = std::function<void(const void *data, const usize size)>; + struct DirectConnectionPeer; + + using PubSubConnectCallback = std::function<void(PubSubResult result, const std::string &resultStr)>; + using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)>; + using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>; struct DirectConnectionPeer { int socket; + Ipv4 address; PubSubReceiveDataCallback receiveDataCallbackFunc; }; @@ -50,12 +48,14 @@ namespace sibs ~DirectConnections(); // Throws ConnectionException on error + std::shared_ptr<DirectConnectionPeer> connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc); + // Throws ConnectionException on error std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc); - // Throws SendException on error - void send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size); + void send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr); protected: int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr); private: + std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc); void receiveData(); bool receiveDataFromPeer(const int socket, char *output); private: diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp index a0fad75..7b757f4 100644 --- a/include/sibs/IpAddress.hpp +++ b/include/sibs/IpAddress.hpp @@ -1,6 +1,5 @@ #pragma once -#include "../utils.hpp" #include <stdexcept> #include <string> #ifndef WIN32 @@ -21,11 +20,13 @@ namespace sibs class Ipv4 { - DISABLE_COPY(Ipv4) public: + Ipv4(); // If @ip is nullptr, then bind to all available sockets (typical for servers) // Throws InvalidAddressException on error. Ipv4(const char *ip, unsigned short port); + Ipv4(const Ipv4 &other); + Ipv4& operator = (const Ipv4 &other); std::string getAddress() const; unsigned short getPort() const; diff --git a/include/sibs/PubsubKey.hpp b/include/sibs/PubsubKey.hpp new file mode 100644 index 0000000..d123331 --- /dev/null +++ b/include/sibs/PubsubKey.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include "../types.hpp" +#include <array> +#include <unordered_map> + +namespace sibs +{ + const usize PUBSUB_KEY_LENGTH = 32; + + // Source: https://stackoverflow.com/a/11414104 (public license) + static size_t fnvHash(const unsigned char *key, int len) + { + size_t h = 2166136261; + for (int i = 0; i < len; i++) + h = (h * 16777619) ^ key[i]; + return h; + } + + class PubsubKey + { + public: + PubsubKey(); + // Create pubsub key from existing data, data will be cutoff at PUBSUB_KEY_LENGTH. If @size is less than PUBSUB_KEY_LENGTH then data is appended with 0 + PubsubKey(const void *data, const usize size); + bool operator == (const PubsubKey &other) const; + bool operator != (const PubsubKey &other) const; + + std::array<u8, PUBSUB_KEY_LENGTH> data; + }; + + struct PubsubKeyHasher + { + size_t operator()(const PubsubKey &pubsubKey) const + { + return fnvHash((const unsigned char*)pubsubKey.data.data(), PUBSUB_KEY_LENGTH); + } + }; + + template <typename ValueType> + using PubsubKeyMap = std::unordered_map<PubsubKey, ValueType, PubsubKeyHasher>; +} diff --git a/project.conf b/project.conf index 3365c0c..403141a 100644 --- a/project.conf +++ b/project.conf @@ -10,3 +10,4 @@ expose_include_dirs = ["include"] [dependencies] udt = "4.11" +sibs-serializer = "1.0.1" diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp new file mode 100644 index 0000000..7c75cde --- /dev/null +++ b/src/BootstrapConnection.cpp @@ -0,0 +1,59 @@ +#include "../include/sibs/BootstrapConnection.hpp" +#include "../include/Log.hpp" +#include <sibs/SafeDeserializer.hpp> + +namespace sibs +{ + BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) + { + serverPeer = connections.connectServer(bootstrapAddress, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + } + + // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key + void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size) + { + sibs::SafeDeserializer deserializer((const u8*)data, size); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); + + auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); + if(listenerFuncIt == listenCallbackFuncs.end()) + { + Log::debug("BoostrapConnection: No listener found for key XXX, ignoring..."); + return; + } + + while(!deserializer.empty()) + { + sa_family_t addressFamily = deserializer.extract<u32>(); + if(addressFamily == AF_INET) + { + in_addr_t ipv4Address = deserializer.extract<u32>(); + u16 port = deserializer.extract<u32>(); + Ipv4 newPeer; + newPeer.address.sin_family = addressFamily; + newPeer.address.sin_addr.s_addr = ipv4Address; + newPeer.address.sin_port = port; + memset(newPeer.address.sin_zero, 0, sizeof(newPeer.address.sin_zero)); + // TODO: Move connection to thread and add callback function, just like @receiveData and @send + connections.connect(newPeer, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerFuncIt->second, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + } + else + Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); + } + } + + void BootstrapConnection::receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size) + { + if(listenCallbackFunc) + listenCallbackFunc(data, size); + } + + void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) + { + if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end()) + throw PubsubKeyAlreadyListeningException(""); + listenCallbackFuncs[pubsubKey] = callbackFunc; + connections.send(serverPeer, std::make_shared<std::vector<u8>>(pubsubKey.data.data(), pubsubKey.data.data())); + } +} diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index e26b9ea..fb10dcc 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -8,6 +8,8 @@ #include <ws2tcpip.h> #endif #include <udt/udt.h> +#include <sibs/SafeSerializer.hpp> +#include <sibs/SafeDeserializer.hpp> namespace sibs { @@ -56,10 +58,45 @@ namespace sibs UDT::epoll_add_usock(connections.eid, clientSocket); std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>(); peer->socket = clientSocket; - peer->receiveDataCallbackFunc = nullptr; + sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr; + memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address)); + peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); connections.peersMutex.lock(); connections.peers[clientSocket] = peer; connections.peersMutex.unlock(); } } + + void BootstrapNode::peerSubscribe(std::shared_ptr<DirectConnectionPeer> newPeer, const void *data, const usize size) + { + sibs::SafeDeserializer deserializer((const u8*)data, size); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); + + auto &peers = subscribedPeers[pubsubKey]; + for(auto &peer : peers) + { + if(peer->address.address.sin_addr.s_addr == newPeer->address.address.sin_addr.s_addr) + return; + } + + sibs::SafeSerializer serializer; + serializer.add(pubsubKey.data.data(), pubsubKey.data.size()); + serializer.add((u32)newPeer->address.address.sin_family); + serializer.add((u32)newPeer->address.address.sin_addr.s_addr); + serializer.add((u16)newPeer->address.address.sin_port); + std::shared_ptr<std::vector<u8>> serializerData = std::make_shared<std::vector<u8>>(std::move(serializer.getBuffer())); + + sibs::SafeSerializer newPeerSerializer; + newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size()); + for(auto &peer : peers) + { + connections.send(peer, serializerData); + newPeerSerializer.add((u32)peer->address.address.sin_family); + newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr); + newPeerSerializer.add((u16)peer->address.address.sin_port); + } + peers.push_back(newPeer); + connections.send(newPeer, std::make_shared<std::vector<u8>>(std::move(newPeerSerializer.getBuffer()))); + } } diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 86b1aa3..6d9fe27 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -70,9 +70,19 @@ namespace sibs return socket; } + std::shared_ptr<DirectConnectionPeer> DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) + { + return connect(address, false, true, receiveDataCallbackFunc); + } + std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) { - UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true); + return connect(address, true, true, receiveDataCallbackFunc); + } + + std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc) + { + UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr); if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) { @@ -85,6 +95,7 @@ namespace sibs UDT::epoll_add_usock(eid, socket); std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>(); peer->socket = socket; + peer->address = address; peer->receiveDataCallbackFunc = receiveDataCallbackFunc; peersMutex.lock(); peers[socket] = peer; @@ -92,20 +103,24 @@ namespace sibs return peer; } - void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size) + void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc) { - usize sentSizeTotal = 0; - while(sentSizeTotal < size) + // TODO: Replace this with light-weight threads (fibers)? + std::thread([peer, data, sendDataCallbackFunc]() { - int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0); - if(sentSize == UDT::ERROR) + usize sentSizeTotal = 0; + while(sentSizeTotal < data->size()) { - std::string errMsg = "UDT: Failed to send data, error: "; - errMsg += UDT::getlasterror_desc(); - throw SendException(errMsg); + int sentSize = UDT::send(peer->socket, (char*)data->data() + sentSizeTotal, data->size() - sentSizeTotal, 0); + if(sentSize == UDT::ERROR) + { + if(sendDataCallbackFunc) + sendDataCallbackFunc(PubSubResult::ERROR, UDT::getlasterror_desc()); + } + sentSizeTotal += sentSize; } - sentSizeTotal += sentSize; - } + sendDataCallbackFunc(PubSubResult::OK, ""); + }).detach(); } void DirectConnections::receiveData() @@ -141,7 +156,7 @@ namespace sibs try { if(peer->receiveDataCallbackFunc) - peer->receiveDataCallbackFunc(data.data(), data.size()); + peer->receiveDataCallbackFunc(peer, data.data(), data.size()); } catch(std::exception &e) { diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp index 22e81e5..38f758d 100644 --- a/src/IpAddress.cpp +++ b/src/IpAddress.cpp @@ -3,6 +3,11 @@ namespace sibs { + Ipv4::Ipv4() : Ipv4(nullptr, 0) + { + + } + Ipv4::Ipv4(const char *ip, unsigned short port) { address.sin_family = AF_INET; @@ -25,6 +30,17 @@ namespace sibs memset(address.sin_zero, 0, sizeof(address.sin_zero)); } + Ipv4::Ipv4(const Ipv4 &other) + { + memcpy(&address, &other.address, sizeof(address)); + } + + Ipv4& Ipv4::operator = (const Ipv4 &other) + { + memcpy(&address, &other.address, sizeof(address)); + return *this; + } + std::string Ipv4::getAddress() const { std::string result; diff --git a/src/PubsubKey.cpp b/src/PubsubKey.cpp new file mode 100644 index 0000000..dd807ce --- /dev/null +++ b/src/PubsubKey.cpp @@ -0,0 +1,27 @@ +#include "../include/sibs/PubsubKey.hpp" + +namespace sibs +{ + PubsubKey::PubsubKey() : + data({}) + { + + } + + PubsubKey::PubsubKey(const void *data, const usize size) + { + std::copy((char*)data, (char*)data + std::min(size, PUBSUB_KEY_LENGTH), this->data.begin()); + if(size < PUBSUB_KEY_LENGTH) + std::fill_n((char*)data + size, PUBSUB_KEY_LENGTH - size, 0); + } + + bool PubsubKey::operator == (const PubsubKey &other) const + { + return data == other.data; + } + + bool PubsubKey::operator != (const PubsubKey &other) const + { + return data != other.data; + } +} |