From 1167a78fd14b8a4a421378c8815a4992e06d11cd Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 23 Nov 2018 15:44:08 +0100 Subject: Add data routing for failed p2p connections --- src/BootstrapNode.cpp | 47 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) (limited to 'src/BootstrapNode.cpp') diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index f6fc0c3..3def1f7 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -92,12 +92,11 @@ namespace sibs void BootstrapNode::messageFromClient(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { sibs::SafeDeserializer deserializer((const u8*)data, size); - PubsubKey pubsubKey; - deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); - if(messageType == MessageType::SUBSCRIBE) { Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto &existingPeer : peers) @@ -115,7 +114,7 @@ namespace sibs newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size()); auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) { - Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str()); + Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str()); }; for(auto &existingPeer : peers) { @@ -139,6 +138,8 @@ namespace sibs else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto it = peers.begin(); it != peers.end(); ++it) @@ -151,9 +152,45 @@ namespace sibs } } } + else if(messageType == MessageType::DATA) + { + Log::debug("BootstrapNode: Received peer data route from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + u16 addressType = deserializer.extract(); + u32 address = deserializer.extract(); + u16 port = deserializer.extract(); + Ipv4 targetAddress(addressType, address, port); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); + auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) + { + Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str()); + }; + + std::lock_guard lock(subscribedPeersMutex); + auto &peers = subscribedPeers[pubsubKey]; + for(auto &existingPeer : peers) + { + if(existingPeer->address == targetAddress) + { + auto existingPeerMessage = std::make_shared(MessageType::DATA); + existingPeerMessage->serializer.add((u16)peer->address.address.sin_family); + existingPeerMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr); + existingPeerMessage->serializer.add((u16)peer->address.address.sin_port); + existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); + existingPeerMessage->append(deserializer.getBuffer(), deserializer.getSize()); + connections.send(existingPeer, existingPeerMessage, sendCallbackFunc); + return; + } + } + + Log::warn("BootstrapNode: Route target (ip: %s, port: %d) does not exist for pubsub key %s", + targetAddress.getAddress().c_str(), + targetAddress.getPort(), + pubsubKey.toString().c_str()); + } else { - Log::warn("BootstrapNode: received message from client that was not subscribe or unsubscribe"); + Log::warn("BootstrapNode: received message from client that was not subscribe, unsubscribe or data"); } } } -- cgit v1.2.3