aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-06-07 22:00:42 +0200
committerdec05eba <dec05eba@protonmail.com>2020-08-18 22:56:48 +0200
commitc2187ca6b61c701c281cc528db43f6b97c50f3d8 (patch)
treef0baf317846902ae628c2e12cf8c25b6eb235c77 /src
parented71e8adf36e3d0c3f6f2b54794fe069091d3376 (diff)
Add bootstrap node, listen method
Diffstat (limited to 'src')
-rw-r--r--src/BootstrapConnection.cpp59
-rw-r--r--src/BootstrapNode.cpp39
-rw-r--r--src/DirectConnection.cpp39
-rw-r--r--src/IpAddress.cpp16
-rw-r--r--src/PubsubKey.cpp27
5 files changed, 167 insertions, 13 deletions
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
new file mode 100644
index 0000000..7c75cde
--- /dev/null
+++ b/src/BootstrapConnection.cpp
@@ -0,0 +1,59 @@
+#include "../include/sibs/BootstrapConnection.hpp"
+#include "../include/Log.hpp"
+#include <sibs/SafeDeserializer.hpp>
+
+namespace sibs
+{
+ BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress)
+ {
+ serverPeer = connections.connectServer(bootstrapAddress, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ }
+
+ // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key
+ void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)
+ {
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+
+ auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
+ if(listenerFuncIt == listenCallbackFuncs.end())
+ {
+ Log::debug("BoostrapConnection: No listener found for key XXX, ignoring...");
+ return;
+ }
+
+ while(!deserializer.empty())
+ {
+ sa_family_t addressFamily = deserializer.extract<u32>();
+ if(addressFamily == AF_INET)
+ {
+ in_addr_t ipv4Address = deserializer.extract<u32>();
+ u16 port = deserializer.extract<u32>();
+ Ipv4 newPeer;
+ newPeer.address.sin_family = addressFamily;
+ newPeer.address.sin_addr.s_addr = ipv4Address;
+ newPeer.address.sin_port = port;
+ memset(newPeer.address.sin_zero, 0, sizeof(newPeer.address.sin_zero));
+ // TODO: Move connection to thread and add callback function, just like @receiveData and @send
+ connections.connect(newPeer, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerFuncIt->second, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ }
+ else
+ Log::error("BootstrapConnection: Unknown address family: %d", addressFamily);
+ }
+ }
+
+ void BootstrapConnection::receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)
+ {
+ if(listenCallbackFunc)
+ listenCallbackFunc(data, size);
+ }
+
+ void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
+ {
+ if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
+ throw PubsubKeyAlreadyListeningException("");
+ listenCallbackFuncs[pubsubKey] = callbackFunc;
+ connections.send(serverPeer, std::make_shared<std::vector<u8>>(pubsubKey.data.data(), pubsubKey.data.data()));
+ }
+}
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index e26b9ea..fb10dcc 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -8,6 +8,8 @@
#include <ws2tcpip.h>
#endif
#include <udt/udt.h>
+#include <sibs/SafeSerializer.hpp>
+#include <sibs/SafeDeserializer.hpp>
namespace sibs
{
@@ -56,10 +58,45 @@ namespace sibs
UDT::epoll_add_usock(connections.eid, clientSocket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
peer->socket = clientSocket;
- peer->receiveDataCallbackFunc = nullptr;
+ sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr;
+ memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address));
+ peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
connections.peersMutex.lock();
connections.peers[clientSocket] = peer;
connections.peersMutex.unlock();
}
}
+
+ void BootstrapNode::peerSubscribe(std::shared_ptr<DirectConnectionPeer> newPeer, const void *data, const usize size)
+ {
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+
+ auto &peers = subscribedPeers[pubsubKey];
+ for(auto &peer : peers)
+ {
+ if(peer->address.address.sin_addr.s_addr == newPeer->address.address.sin_addr.s_addr)
+ return;
+ }
+
+ sibs::SafeSerializer serializer;
+ serializer.add(pubsubKey.data.data(), pubsubKey.data.size());
+ serializer.add((u32)newPeer->address.address.sin_family);
+ serializer.add((u32)newPeer->address.address.sin_addr.s_addr);
+ serializer.add((u16)newPeer->address.address.sin_port);
+ std::shared_ptr<std::vector<u8>> serializerData = std::make_shared<std::vector<u8>>(std::move(serializer.getBuffer()));
+
+ sibs::SafeSerializer newPeerSerializer;
+ newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size());
+ for(auto &peer : peers)
+ {
+ connections.send(peer, serializerData);
+ newPeerSerializer.add((u32)peer->address.address.sin_family);
+ newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr);
+ newPeerSerializer.add((u16)peer->address.address.sin_port);
+ }
+ peers.push_back(newPeer);
+ connections.send(newPeer, std::make_shared<std::vector<u8>>(std::move(newPeerSerializer.getBuffer())));
+ }
}
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 86b1aa3..6d9fe27 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -70,9 +70,19 @@ namespace sibs
return socket;
}
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ return connect(address, false, true, receiveDataCallbackFunc);
+ }
+
std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true);
+ return connect(address, true, true, receiveDataCallbackFunc);
+ }
+
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr);
if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
{
@@ -85,6 +95,7 @@ namespace sibs
UDT::epoll_add_usock(eid, socket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
peer->socket = socket;
+ peer->address = address;
peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
peersMutex.lock();
peers[socket] = peer;
@@ -92,20 +103,24 @@ namespace sibs
return peer;
}
- void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size)
+ void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc)
{
- usize sentSizeTotal = 0;
- while(sentSizeTotal < size)
+ // TODO: Replace this with light-weight threads (fibers)?
+ std::thread([peer, data, sendDataCallbackFunc]()
{
- int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0);
- if(sentSize == UDT::ERROR)
+ usize sentSizeTotal = 0;
+ while(sentSizeTotal < data->size())
{
- std::string errMsg = "UDT: Failed to send data, error: ";
- errMsg += UDT::getlasterror_desc();
- throw SendException(errMsg);
+ int sentSize = UDT::send(peer->socket, (char*)data->data() + sentSizeTotal, data->size() - sentSizeTotal, 0);
+ if(sentSize == UDT::ERROR)
+ {
+ if(sendDataCallbackFunc)
+ sendDataCallbackFunc(PubSubResult::ERROR, UDT::getlasterror_desc());
+ }
+ sentSizeTotal += sentSize;
}
- sentSizeTotal += sentSize;
- }
+ sendDataCallbackFunc(PubSubResult::OK, "");
+ }).detach();
}
void DirectConnections::receiveData()
@@ -141,7 +156,7 @@ namespace sibs
try
{
if(peer->receiveDataCallbackFunc)
- peer->receiveDataCallbackFunc(data.data(), data.size());
+ peer->receiveDataCallbackFunc(peer, data.data(), data.size());
}
catch(std::exception &e)
{
diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp
index 22e81e5..38f758d 100644
--- a/src/IpAddress.cpp
+++ b/src/IpAddress.cpp
@@ -3,6 +3,11 @@
namespace sibs
{
+ Ipv4::Ipv4() : Ipv4(nullptr, 0)
+ {
+
+ }
+
Ipv4::Ipv4(const char *ip, unsigned short port)
{
address.sin_family = AF_INET;
@@ -25,6 +30,17 @@ namespace sibs
memset(address.sin_zero, 0, sizeof(address.sin_zero));
}
+ Ipv4::Ipv4(const Ipv4 &other)
+ {
+ memcpy(&address, &other.address, sizeof(address));
+ }
+
+ Ipv4& Ipv4::operator = (const Ipv4 &other)
+ {
+ memcpy(&address, &other.address, sizeof(address));
+ return *this;
+ }
+
std::string Ipv4::getAddress() const
{
std::string result;
diff --git a/src/PubsubKey.cpp b/src/PubsubKey.cpp
new file mode 100644
index 0000000..dd807ce
--- /dev/null
+++ b/src/PubsubKey.cpp
@@ -0,0 +1,27 @@
+#include "../include/sibs/PubsubKey.hpp"
+
+namespace sibs
+{
+ PubsubKey::PubsubKey() :
+ data({})
+ {
+
+ }
+
+ PubsubKey::PubsubKey(const void *data, const usize size)
+ {
+ std::copy((char*)data, (char*)data + std::min(size, PUBSUB_KEY_LENGTH), this->data.begin());
+ if(size < PUBSUB_KEY_LENGTH)
+ std::fill_n((char*)data + size, PUBSUB_KEY_LENGTH - size, 0);
+ }
+
+ bool PubsubKey::operator == (const PubsubKey &other) const
+ {
+ return data == other.data;
+ }
+
+ bool PubsubKey::operator != (const PubsubKey &other) const
+ {
+ return data != other.data;
+ }
+}