aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-06-07 22:00:42 +0200
committerdec05eba <dec05eba@protonmail.com>2020-08-18 22:56:48 +0200
commitc2187ca6b61c701c281cc528db43f6b97c50f3d8 (patch)
treef0baf317846902ae628c2e12cf8c25b6eb235c77
parented71e8adf36e3d0c3f6f2b54794fe069091d3376 (diff)
Add bootstrap node, listen method
-rw-r--r--include/sibs/BootstrapConnection.hpp33
-rw-r--r--include/sibs/BootstrapNode.hpp4
-rw-r--r--include/sibs/DirectConnection.hpp24
-rw-r--r--include/sibs/IpAddress.hpp5
-rw-r--r--include/sibs/PubsubKey.hpp42
-rw-r--r--project.conf1
-rw-r--r--src/BootstrapConnection.cpp59
-rw-r--r--src/BootstrapNode.cpp39
-rw-r--r--src/DirectConnection.cpp39
-rw-r--r--src/IpAddress.cpp16
-rw-r--r--src/PubsubKey.cpp27
11 files changed, 262 insertions, 27 deletions
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp
new file mode 100644
index 0000000..964d777
--- /dev/null
+++ b/include/sibs/BootstrapConnection.hpp
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "../utils.hpp"
+#include "DirectConnection.hpp"
+#include "PubsubKey.hpp"
+
+namespace sibs
+{
+ class PubsubKeyAlreadyListeningException : public std::runtime_error
+ {
+ public:
+ PubsubKeyAlreadyListeningException(const std::string &errMsg) : std::runtime_error(errMsg) {}
+ };
+
+ using BoostrapConnectionListenCallbackFunc = std::function<void(const void *data, const usize size)>;
+
+ class BootstrapConnection
+ {
+ DISABLE_COPY(BootstrapConnection)
+ public:
+ BootstrapConnection(const Ipv4 &bootstrapAddress);
+
+ // Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey
+ void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc);
+ private:
+ void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
+ void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
+ private:
+ DirectConnections connections;
+ std::shared_ptr<DirectConnectionPeer> serverPeer;
+ PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs;
+ };
+}
diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp
index eaf48eb..13a62e1 100644
--- a/include/sibs/BootstrapNode.hpp
+++ b/include/sibs/BootstrapNode.hpp
@@ -2,6 +2,8 @@
#include "DirectConnection.hpp"
#include "IpAddress.hpp"
+#include "PubsubKey.hpp"
+#include <vector>
namespace sibs
{
@@ -20,9 +22,11 @@ namespace sibs
~BootstrapNode();
private:
void acceptConnections();
+ void peerSubscribe(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
private:
DirectConnections connections;
int socket;
std::thread acceptConnectionsThread;
+ PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers;
};
}
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index c4431a5..0e8b961 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -7,6 +7,7 @@
#include <memory>
#include <thread>
#include <mutex>
+#include <vector>
#include "IpAddress.hpp"
#include "../types.hpp"
#include "../utils.hpp"
@@ -19,25 +20,22 @@ namespace sibs
ConnectionException(const std::string &errMsg) : std::runtime_error(errMsg) {}
};
- class SendException : public std::runtime_error
- {
- public:
- SendException(const std::string &errMsg) : std::runtime_error(errMsg) {}
- };
-
-
- enum class PubSubConnectResult
+ enum class PubSubResult
{
OK,
ERROR
};
- using PubSubConnectCallback = std::function<void(PubSubConnectResult result, const std::string &resultStr)>;
- using PubSubReceiveDataCallback = std::function<void(const void *data, const usize size)>;
+ struct DirectConnectionPeer;
+
+ using PubSubConnectCallback = std::function<void(PubSubResult result, const std::string &resultStr)>;
+ using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)>;
+ using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>;
struct DirectConnectionPeer
{
int socket;
+ Ipv4 address;
PubSubReceiveDataCallback receiveDataCallbackFunc;
};
@@ -50,12 +48,14 @@ namespace sibs
~DirectConnections();
// Throws ConnectionException on error
+ std::shared_ptr<DirectConnectionPeer> connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc);
+ // Throws ConnectionException on error
std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc);
- // Throws SendException on error
- void send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size);
+ void send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr);
protected:
int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr);
private:
+ std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc);
void receiveData();
bool receiveDataFromPeer(const int socket, char *output);
private:
diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp
index a0fad75..7b757f4 100644
--- a/include/sibs/IpAddress.hpp
+++ b/include/sibs/IpAddress.hpp
@@ -1,6 +1,5 @@
#pragma once
-#include "../utils.hpp"
#include <stdexcept>
#include <string>
#ifndef WIN32
@@ -21,11 +20,13 @@ namespace sibs
class Ipv4
{
- DISABLE_COPY(Ipv4)
public:
+ Ipv4();
// If @ip is nullptr, then bind to all available sockets (typical for servers)
// Throws InvalidAddressException on error.
Ipv4(const char *ip, unsigned short port);
+ Ipv4(const Ipv4 &other);
+ Ipv4& operator = (const Ipv4 &other);
std::string getAddress() const;
unsigned short getPort() const;
diff --git a/include/sibs/PubsubKey.hpp b/include/sibs/PubsubKey.hpp
new file mode 100644
index 0000000..d123331
--- /dev/null
+++ b/include/sibs/PubsubKey.hpp
@@ -0,0 +1,42 @@
+#pragma once
+
+#include "../types.hpp"
+#include <array>
+#include <unordered_map>
+
+namespace sibs
+{
+ const usize PUBSUB_KEY_LENGTH = 32;
+
+ // Source: https://stackoverflow.com/a/11414104 (public license)
+ static size_t fnvHash(const unsigned char *key, int len)
+ {
+ size_t h = 2166136261;
+ for (int i = 0; i < len; i++)
+ h = (h * 16777619) ^ key[i];
+ return h;
+ }
+
+ class PubsubKey
+ {
+ public:
+ PubsubKey();
+ // Create pubsub key from existing data, data will be cutoff at PUBSUB_KEY_LENGTH. If @size is less than PUBSUB_KEY_LENGTH then data is appended with 0
+ PubsubKey(const void *data, const usize size);
+ bool operator == (const PubsubKey &other) const;
+ bool operator != (const PubsubKey &other) const;
+
+ std::array<u8, PUBSUB_KEY_LENGTH> data;
+ };
+
+ struct PubsubKeyHasher
+ {
+ size_t operator()(const PubsubKey &pubsubKey) const
+ {
+ return fnvHash((const unsigned char*)pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+ }
+ };
+
+ template <typename ValueType>
+ using PubsubKeyMap = std::unordered_map<PubsubKey, ValueType, PubsubKeyHasher>;
+}
diff --git a/project.conf b/project.conf
index 3365c0c..403141a 100644
--- a/project.conf
+++ b/project.conf
@@ -10,3 +10,4 @@ expose_include_dirs = ["include"]
[dependencies]
udt = "4.11"
+sibs-serializer = "1.0.1"
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
new file mode 100644
index 0000000..7c75cde
--- /dev/null
+++ b/src/BootstrapConnection.cpp
@@ -0,0 +1,59 @@
+#include "../include/sibs/BootstrapConnection.hpp"
+#include "../include/Log.hpp"
+#include <sibs/SafeDeserializer.hpp>
+
+namespace sibs
+{
+ BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress)
+ {
+ serverPeer = connections.connectServer(bootstrapAddress, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ }
+
+ // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key
+ void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)
+ {
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+
+ auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
+ if(listenerFuncIt == listenCallbackFuncs.end())
+ {
+ Log::debug("BoostrapConnection: No listener found for key XXX, ignoring...");
+ return;
+ }
+
+ while(!deserializer.empty())
+ {
+ sa_family_t addressFamily = deserializer.extract<u32>();
+ if(addressFamily == AF_INET)
+ {
+ in_addr_t ipv4Address = deserializer.extract<u32>();
+ u16 port = deserializer.extract<u32>();
+ Ipv4 newPeer;
+ newPeer.address.sin_family = addressFamily;
+ newPeer.address.sin_addr.s_addr = ipv4Address;
+ newPeer.address.sin_port = port;
+ memset(newPeer.address.sin_zero, 0, sizeof(newPeer.address.sin_zero));
+ // TODO: Move connection to thread and add callback function, just like @receiveData and @send
+ connections.connect(newPeer, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerFuncIt->second, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ }
+ else
+ Log::error("BootstrapConnection: Unknown address family: %d", addressFamily);
+ }
+ }
+
+ void BootstrapConnection::receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)
+ {
+ if(listenCallbackFunc)
+ listenCallbackFunc(data, size);
+ }
+
+ void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
+ {
+ if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
+ throw PubsubKeyAlreadyListeningException("");
+ listenCallbackFuncs[pubsubKey] = callbackFunc;
+ connections.send(serverPeer, std::make_shared<std::vector<u8>>(pubsubKey.data.data(), pubsubKey.data.data()));
+ }
+}
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index e26b9ea..fb10dcc 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -8,6 +8,8 @@
#include <ws2tcpip.h>
#endif
#include <udt/udt.h>
+#include <sibs/SafeSerializer.hpp>
+#include <sibs/SafeDeserializer.hpp>
namespace sibs
{
@@ -56,10 +58,45 @@ namespace sibs
UDT::epoll_add_usock(connections.eid, clientSocket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
peer->socket = clientSocket;
- peer->receiveDataCallbackFunc = nullptr;
+ sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr;
+ memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address));
+ peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
connections.peersMutex.lock();
connections.peers[clientSocket] = peer;
connections.peersMutex.unlock();
}
}
+
+ void BootstrapNode::peerSubscribe(std::shared_ptr<DirectConnectionPeer> newPeer, const void *data, const usize size)
+ {
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+
+ auto &peers = subscribedPeers[pubsubKey];
+ for(auto &peer : peers)
+ {
+ if(peer->address.address.sin_addr.s_addr == newPeer->address.address.sin_addr.s_addr)
+ return;
+ }
+
+ sibs::SafeSerializer serializer;
+ serializer.add(pubsubKey.data.data(), pubsubKey.data.size());
+ serializer.add((u32)newPeer->address.address.sin_family);
+ serializer.add((u32)newPeer->address.address.sin_addr.s_addr);
+ serializer.add((u16)newPeer->address.address.sin_port);
+ std::shared_ptr<std::vector<u8>> serializerData = std::make_shared<std::vector<u8>>(std::move(serializer.getBuffer()));
+
+ sibs::SafeSerializer newPeerSerializer;
+ newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size());
+ for(auto &peer : peers)
+ {
+ connections.send(peer, serializerData);
+ newPeerSerializer.add((u32)peer->address.address.sin_family);
+ newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr);
+ newPeerSerializer.add((u16)peer->address.address.sin_port);
+ }
+ peers.push_back(newPeer);
+ connections.send(newPeer, std::make_shared<std::vector<u8>>(std::move(newPeerSerializer.getBuffer())));
+ }
}
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 86b1aa3..6d9fe27 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -70,9 +70,19 @@ namespace sibs
return socket;
}
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ return connect(address, false, true, receiveDataCallbackFunc);
+ }
+
std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true);
+ return connect(address, true, true, receiveDataCallbackFunc);
+ }
+
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr);
if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
{
@@ -85,6 +95,7 @@ namespace sibs
UDT::epoll_add_usock(eid, socket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
peer->socket = socket;
+ peer->address = address;
peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
peersMutex.lock();
peers[socket] = peer;
@@ -92,20 +103,24 @@ namespace sibs
return peer;
}
- void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size)
+ void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc)
{
- usize sentSizeTotal = 0;
- while(sentSizeTotal < size)
+ // TODO: Replace this with light-weight threads (fibers)?
+ std::thread([peer, data, sendDataCallbackFunc]()
{
- int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0);
- if(sentSize == UDT::ERROR)
+ usize sentSizeTotal = 0;
+ while(sentSizeTotal < data->size())
{
- std::string errMsg = "UDT: Failed to send data, error: ";
- errMsg += UDT::getlasterror_desc();
- throw SendException(errMsg);
+ int sentSize = UDT::send(peer->socket, (char*)data->data() + sentSizeTotal, data->size() - sentSizeTotal, 0);
+ if(sentSize == UDT::ERROR)
+ {
+ if(sendDataCallbackFunc)
+ sendDataCallbackFunc(PubSubResult::ERROR, UDT::getlasterror_desc());
+ }
+ sentSizeTotal += sentSize;
}
- sentSizeTotal += sentSize;
- }
+ sendDataCallbackFunc(PubSubResult::OK, "");
+ }).detach();
}
void DirectConnections::receiveData()
@@ -141,7 +156,7 @@ namespace sibs
try
{
if(peer->receiveDataCallbackFunc)
- peer->receiveDataCallbackFunc(data.data(), data.size());
+ peer->receiveDataCallbackFunc(peer, data.data(), data.size());
}
catch(std::exception &e)
{
diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp
index 22e81e5..38f758d 100644
--- a/src/IpAddress.cpp
+++ b/src/IpAddress.cpp
@@ -3,6 +3,11 @@
namespace sibs
{
+ Ipv4::Ipv4() : Ipv4(nullptr, 0)
+ {
+
+ }
+
Ipv4::Ipv4(const char *ip, unsigned short port)
{
address.sin_family = AF_INET;
@@ -25,6 +30,17 @@ namespace sibs
memset(address.sin_zero, 0, sizeof(address.sin_zero));
}
+ Ipv4::Ipv4(const Ipv4 &other)
+ {
+ memcpy(&address, &other.address, sizeof(address));
+ }
+
+ Ipv4& Ipv4::operator = (const Ipv4 &other)
+ {
+ memcpy(&address, &other.address, sizeof(address));
+ return *this;
+ }
+
std::string Ipv4::getAddress() const
{
std::string result;
diff --git a/src/PubsubKey.cpp b/src/PubsubKey.cpp
new file mode 100644
index 0000000..dd807ce
--- /dev/null
+++ b/src/PubsubKey.cpp
@@ -0,0 +1,27 @@
+#include "../include/sibs/PubsubKey.hpp"
+
+namespace sibs
+{
+ PubsubKey::PubsubKey() :
+ data({})
+ {
+
+ }
+
+ PubsubKey::PubsubKey(const void *data, const usize size)
+ {
+ std::copy((char*)data, (char*)data + std::min(size, PUBSUB_KEY_LENGTH), this->data.begin());
+ if(size < PUBSUB_KEY_LENGTH)
+ std::fill_n((char*)data + size, PUBSUB_KEY_LENGTH - size, 0);
+ }
+
+ bool PubsubKey::operator == (const PubsubKey &other) const
+ {
+ return data == other.data;
+ }
+
+ bool PubsubKey::operator != (const PubsubKey &other) const
+ {
+ return data != other.data;
+ }
+}