aboutsummaryrefslogtreecommitdiff
path: root/src/BootstrapNode.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/BootstrapNode.cpp')
-rw-r--r--src/BootstrapNode.cpp47
1 files changed, 42 insertions, 5 deletions
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index f6fc0c3..3def1f7 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -92,12 +92,11 @@ namespace sibs
void BootstrapNode::messageFromClient(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
sibs::SafeDeserializer deserializer((const u8*)data, size);
- PubsubKey pubsubKey;
- deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
-
if(messageType == MessageType::SUBSCRIBE)
{
Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
std::lock_guard<std::mutex> lock(subscribedPeersMutex);
auto &peers = subscribedPeers[pubsubKey];
for(auto &existingPeer : peers)
@@ -115,7 +114,7 @@ namespace sibs
newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size());
auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
{
- Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str());
+ Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str());
};
for(auto &existingPeer : peers)
{
@@ -139,6 +138,8 @@ namespace sibs
else if(messageType == MessageType::UNSUBSCRIBE)
{
Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
std::lock_guard<std::mutex> lock(subscribedPeersMutex);
auto &peers = subscribedPeers[pubsubKey];
for(auto it = peers.begin(); it != peers.end(); ++it)
@@ -151,9 +152,45 @@ namespace sibs
}
}
}
+ else if(messageType == MessageType::DATA)
+ {
+ Log::debug("BootstrapNode: Received peer data route from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ u16 addressType = deserializer.extract<u16>();
+ u32 address = deserializer.extract<u32>();
+ u16 port = deserializer.extract<u16>();
+ Ipv4 targetAddress(addressType, address, port);
+ PubsubKey pubsubKey;
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
+ auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
+ {
+ Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str());
+ };
+
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ auto &peers = subscribedPeers[pubsubKey];
+ for(auto &existingPeer : peers)
+ {
+ if(existingPeer->address == targetAddress)
+ {
+ auto existingPeerMessage = std::make_shared<Message>(MessageType::DATA);
+ existingPeerMessage->serializer.add((u16)peer->address.address.sin_family);
+ existingPeerMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr);
+ existingPeerMessage->serializer.add((u16)peer->address.address.sin_port);
+ existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size());
+ existingPeerMessage->append(deserializer.getBuffer(), deserializer.getSize());
+ connections.send(existingPeer, existingPeerMessage, sendCallbackFunc);
+ return;
+ }
+ }
+
+ Log::warn("BootstrapNode: Route target (ip: %s, port: %d) does not exist for pubsub key %s",
+ targetAddress.getAddress().c_str(),
+ targetAddress.getPort(),
+ pubsubKey.toString().c_str());
+ }
else
{
- Log::warn("BootstrapNode: received message from client that was not subscribe or unsubscribe");
+ Log::warn("BootstrapNode: received message from client that was not subscribe, unsubscribe or data");
}
}
}