aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-11-23 15:44:08 +0100
committerdec05eba <dec05eba@protonmail.com>2020-08-18 22:56:48 +0200
commit1167a78fd14b8a4a421378c8815a4992e06d11cd (patch)
tree3167162354e19e28d4475ddf9362160f4275e050
parentb25c227f2357e351d2c1653fdfdcd1f099ac7f66 (diff)
Add data routing for failed p2p connections
-rw-r--r--include/sibs/DirectConnection.hpp1
-rw-r--r--include/sibs/IpAddress.hpp2
-rw-r--r--include/sibs/Message.hpp12
-rw-r--r--src/BootstrapConnection.cpp66
-rw-r--r--src/BootstrapNode.cpp47
-rw-r--r--src/DirectConnection.cpp48
-rw-r--r--src/IpAddress.cpp8
-rw-r--r--src/Message.cpp4
8 files changed, 149 insertions, 39 deletions
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index eb44153..70eaf4f 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -56,6 +56,7 @@ namespace sibs
PubSubReceiveDataCallback receiveDataCallbackFunc;
int sharedKeys = 0;
PeerType type = PeerType::CLIENT;
+ bool routed = false;
bool operator == (const DirectConnectionPeer &other) const;
bool operator != (const DirectConnectionPeer &other) const;
diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp
index 54cb58a..ea2a7bd 100644
--- a/include/sibs/IpAddress.hpp
+++ b/include/sibs/IpAddress.hpp
@@ -4,6 +4,7 @@
#include <string>
#include <unordered_map>
#include "../env.hpp"
+#include "../types.hpp"
#if OS_FAMILY != OS_FAMILY_WINDOWS
#include <arpa/inet.h>
#include <netdb.h>
@@ -32,6 +33,7 @@ namespace sibs
// If @ip is nullptr, then bind to all available sockets (typical for servers)
// Throws InvalidAddressException on error.
Ipv4(const char *ip, unsigned short port);
+ Ipv4(u16 family, u32 address, u16 port);
Ipv4(const Ipv4 &other);
Ipv4& operator = (const Ipv4 &other);
diff --git a/include/sibs/Message.hpp b/include/sibs/Message.hpp
index b6eb858..4591221 100644
--- a/include/sibs/Message.hpp
+++ b/include/sibs/Message.hpp
@@ -1,7 +1,7 @@
#pragma once
#include "../types.hpp"
-#include <vector>
+#include <sibs/SafeSerializer.hpp>
namespace sibs
{
@@ -20,11 +20,11 @@ namespace sibs
void append(const void *data, const usize size);
- usize getDataSize() const { return rawData.size() - 1; }
- usize getRawSize() const { return rawData.size(); }
+ usize getDataSize() const { return serializer.getSize() - 1; }
+ usize getRawSize() const { return serializer.getSize(); }
- const u8* data() const { return rawData.data(); }
- private:
- std::vector<u8> rawData;
+ const u8* data() { return serializer.getBuffer().data(); }
+
+ sibs::SafeSerializer serializer;
};
} \ No newline at end of file
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 4ace97c..d914646 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -114,11 +114,7 @@ namespace sibs
{
in_addr_t ipv4Address = deserializer.extract<u32>();
u16 port = deserializer.extract<u16>();
- Ipv4 newPeerAddress;
- newPeerAddress.address.sin_family = addressFamily;
- newPeerAddress.address.sin_addr.s_addr = ipv4Address;
- newPeerAddress.address.sin_port = port;
- memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
+ Ipv4 newPeerAddress(addressFamily, ipv4Address, port);
Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort());
connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)
{
@@ -132,7 +128,6 @@ namespace sibs
return;
}
subscribeDataIt->second.peers.push_back(peer);
- ++peer->sharedKeys;
Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort());
}
else
@@ -149,11 +144,18 @@ namespace sibs
void BootstrapConnection::receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
- if(size < PUBSUB_KEY_LENGTH)
- return;
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ Ipv4 targetAddress;
+ if(peer == serverPeer)
+ {
+ u16 addressType = deserializer.extract<u16>();
+ u32 address = deserializer.extract<u32>();
+ u16 port = deserializer.extract<u16>();
+ targetAddress = Ipv4(addressType, address, port);
+ }
PubsubKey pubsubKey;
- memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH);
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
if(messageType == MessageType::DATA)
{
subscribeDataMutex.lock();
@@ -165,11 +167,30 @@ namespace sibs
return;
}
auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc;
+ if(peer == serverPeer)
+ {
+ for(auto pubsubPeer : listenerFuncIt->second.peers)
+ {
+ if(pubsubPeer->address == targetAddress)
+ {
+ peer = pubsubPeer;
+ Log::debug("BootstrapConnection::receiveDataFromPeer: Received routed data from server, originator peer: ip: %s, port: %d", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str());
+ break;
+ }
+ }
+
+ if(peer == serverPeer)
+ {
+ Log::warn("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ subscribeDataMutex.unlock();
+ return;
+ }
+ }
subscribeDataMutex.unlock();
if(listenCallbackFunc)
{
- bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH);
+ bool continueListening = listenCallbackFunc(peer.get(), deserializer.getBuffer(), deserializer.getSize());
if(!continueListening)
cancelListen({ pubsubKey });
}
@@ -257,11 +278,12 @@ namespace sibs
listen(pubsubKey, nullptr, false);
}
}
-
+
+ std::vector<unsigned char> containedData((unsigned char*)data, (unsigned char*)data + size);
auto message = std::make_shared<Message>(MessageType::DATA);
message->append(pubsubKey.data.data(), pubsubKey.data.size());
- message->append(data, size);
- std::thread([this, pubsubKey, message]()
+ message->append(containedData.data(), containedData.size());
+ std::thread([this, pubsubKey, message](std::vector<unsigned char> containedData)
{
++putThreadCount;
std::unordered_set<int> peersMessaged;
@@ -294,7 +316,20 @@ namespace sibs
{
for(auto &peer : peersToMessage)
{
- connections.send(peer, message);
+ if(peer->routed)
+ {
+ auto routedMessage = std::make_shared<Message>(MessageType::DATA);
+ routedMessage->serializer.add((u16)peer->address.address.sin_family);
+ routedMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr);
+ routedMessage->serializer.add((u16)peer->address.address.sin_port);
+ routedMessage->append(pubsubKey.data.data(), pubsubKey.data.size());
+ routedMessage->append(containedData.data(), containedData.size());
+ connections.send(serverPeer, routedMessage);
+ }
+ else
+ {
+ connections.send(peer, message);
+ }
}
peersToMessage.clear();
}
@@ -307,7 +342,7 @@ namespace sibs
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
--putThreadCount;
- }).detach();
+ }, std::move(containedData)).detach();
return true;
}
@@ -327,6 +362,7 @@ namespace sibs
message->append(listener.key.data.data(), listener.key.data.size());
connections.send(serverPeer, message);
+ // TODO: Unsubscribe to peers we are connected to indirectly (routed data)
for(auto &peer : peersToMessage)
{
--peer->sharedKeys;
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index f6fc0c3..3def1f7 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -92,12 +92,11 @@ namespace sibs
void BootstrapNode::messageFromClient(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
sibs::SafeDeserializer deserializer((const u8*)data, size);
- PubsubKey pubsubKey;
- deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
-
if(messageType == MessageType::SUBSCRIBE)
{
Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
std::lock_guard<std::mutex> lock(subscribedPeersMutex);
auto &peers = subscribedPeers[pubsubKey];
for(auto &existingPeer : peers)
@@ -115,7 +114,7 @@ namespace sibs
newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size());
auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
{
- Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str());
+ Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str());
};
for(auto &existingPeer : peers)
{
@@ -139,6 +138,8 @@ namespace sibs
else if(messageType == MessageType::UNSUBSCRIBE)
{
Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
std::lock_guard<std::mutex> lock(subscribedPeersMutex);
auto &peers = subscribedPeers[pubsubKey];
for(auto it = peers.begin(); it != peers.end(); ++it)
@@ -151,9 +152,45 @@ namespace sibs
}
}
}
+ else if(messageType == MessageType::DATA)
+ {
+ Log::debug("BootstrapNode: Received peer data route from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ u16 addressType = deserializer.extract<u16>();
+ u32 address = deserializer.extract<u32>();
+ u16 port = deserializer.extract<u16>();
+ Ipv4 targetAddress(addressType, address, port);
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
+ auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
+ {
+ Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str());
+ };
+
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ auto &peers = subscribedPeers[pubsubKey];
+ for(auto &existingPeer : peers)
+ {
+ if(existingPeer->address == targetAddress)
+ {
+ auto existingPeerMessage = std::make_shared<Message>(MessageType::DATA);
+ existingPeerMessage->serializer.add((u16)peer->address.address.sin_family);
+ existingPeerMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr);
+ existingPeerMessage->serializer.add((u16)peer->address.address.sin_port);
+ existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size());
+ existingPeerMessage->append(deserializer.getBuffer(), deserializer.getSize());
+ connections.send(existingPeer, existingPeerMessage, sendCallbackFunc);
+ return;
+ }
+ }
+
+ Log::warn("BootstrapNode: Route target (ip: %s, port: %d) does not exist for pubsub key %s",
+ targetAddress.getAddress().c_str(),
+ targetAddress.getPort(),
+ pubsubKey.toString().c_str());
+ }
else
{
- Log::warn("BootstrapNode: received message from client that was not subscribe or unsubscribe");
+ Log::warn("BootstrapNode: received message from client that was not subscribe, unsubscribe or data");
}
}
}
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index e5a997f..cb71c8f 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -156,27 +156,53 @@ namespace sibs
connectCallbackFunc(peer, PubSubResult::RESULT_ERROR, e.what());
return PubSubConnectResult { peer, PubSubResult::RESULT_ERROR, e.what() };
}
+
+ int socketId = socket->udtSocket;
+ if(!server)
+ {
+ UDT::epoll_add_usock(eid, socket->udtSocket);
+ socket->eid = eid;
+ peersMutex.lock();
+ peers[socket->udtSocket] = peer;
+ peer->socket = std::move(socket);
+ peer->address = address;
+ peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
+ peer->type = (server ? PeerType::SERVER : PeerType::CLIENT);
+ peer->routed = true;
+ peer->sharedKeys = 1;
+ peersMutex.unlock();
+ }
Log::debug("DirectConnections: Connecting to peer (ip: %s, port: %d, rendezvous: %s)", address.getAddress().c_str(), address.getPort(), rendezvous ? "yes" : "no");
- if(UDT::connect(socket->udtSocket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
+ if(UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
{
+ if(!server)
+ peers.erase(peers.find(socketId));
+
if(connectCallbackFunc)
connectCallbackFunc(peer, PubSubResult::RESULT_ERROR, UDT::getlasterror_desc());
+
return PubSubConnectResult{ peer, PubSubResult::RESULT_ERROR, UDT::getlasterror_desc() };
}
-
- UDT::epoll_add_usock(eid, socket->udtSocket);
- socket->eid = eid;
- peersMutex.lock();
- peers[socket->udtSocket] = peer;
- peer->socket = std::move(socket);
- peer->address = address;
- peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
- peer->type = (server ? PeerType::SERVER : PeerType::CLIENT);
- peersMutex.unlock();
+
+ if(server)
+ {
+ UDT::epoll_add_usock(eid, socketId);
+ socket->eid = eid;
+ peersMutex.lock();
+ peers[socketId] = peer;
+ peer->socket = std::move(socket);
+ peer->address = address;
+ peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
+ peer->type = PeerType::SERVER;
+ peer->sharedKeys = 1;
+ peersMutex.unlock();
+ }
+ peer->routed = false;
if(connectCallbackFunc)
connectCallbackFunc(peer, PubSubResult::RESULT_OK, "");
+
return PubSubConnectResult { peer, PubSubResult::RESULT_OK, "" };
});
connectionResultsMutex.unlock();
diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp
index 75e4348..76f1c63 100644
--- a/src/IpAddress.cpp
+++ b/src/IpAddress.cpp
@@ -194,6 +194,14 @@ namespace sibs
address.sin_addr.s_addr = INADDR_ANY;
memset(address.sin_zero, 0, sizeof(address.sin_zero));
}
+
+ Ipv4::Ipv4(u16 family, u32 addr, u16 port)
+ {
+ address.sin_family = family;
+ address.sin_addr.s_addr = addr;
+ address.sin_port = port;
+ memset(address.sin_zero, 0, sizeof(address.sin_zero));
+ }
Ipv4::Ipv4(const Ipv4 &other)
{
diff --git a/src/Message.cpp b/src/Message.cpp
index 58b39da..89973d3 100644
--- a/src/Message.cpp
+++ b/src/Message.cpp
@@ -5,11 +5,11 @@ namespace sibs
Message::Message(MessageType messageType)
{
static_assert(sizeof(MessageType) == sizeof(u8), "Whoops, message type size has changed, the below code doesn't work");
- rawData.push_back((u8)messageType);
+ serializer.add((u8)messageType);
}
void Message::append(const void *data, const usize size)
{
- rawData.insert(rawData.end(), (const u8*)data, (const u8*)data + size);
+ serializer.add((const u8*)data, size);
}
} \ No newline at end of file