From 81b59a08b69d4872b06930aedd06a957c4acb7cc Mon Sep 17 00:00:00 2001 From: Aleksi Lindeman Date: Fri, 23 Nov 2018 15:44:08 +0100 Subject: Add data routing for failed p2p connections --- src/BootstrapConnection.cpp | 66 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 15 deletions(-) (limited to 'src/BootstrapConnection.cpp') diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 4ace97c..d914646 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -114,11 +114,7 @@ namespace sibs { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); - Ipv4 newPeerAddress; - newPeerAddress.address.sin_family = addressFamily; - newPeerAddress.address.sin_addr.s_addr = ipv4Address; - newPeerAddress.address.sin_port = port; - memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); + Ipv4 newPeerAddress(addressFamily, ipv4Address, port); Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { @@ -132,7 +128,6 @@ namespace sibs return; } subscribeDataIt->second.peers.push_back(peer); - ++peer->sharedKeys; Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } else @@ -149,11 +144,18 @@ namespace sibs void BootstrapConnection::receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { - if(size < PUBSUB_KEY_LENGTH) - return; + sibs::SafeDeserializer deserializer((const u8*)data, size); + Ipv4 targetAddress; + if(peer == serverPeer) + { + u16 addressType = deserializer.extract(); + u32 address = deserializer.extract(); + u16 port = deserializer.extract(); + targetAddress = Ipv4(addressType, address, port); + } PubsubKey pubsubKey; - memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); if(messageType == MessageType::DATA) { subscribeDataMutex.lock(); @@ -165,11 +167,30 @@ namespace sibs return; } auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc; + if(peer == serverPeer) + { + for(auto pubsubPeer : listenerFuncIt->second.peers) + { + if(pubsubPeer->address == targetAddress) + { + peer = pubsubPeer; + Log::debug("BootstrapConnection::receiveDataFromPeer: Received routed data from server, originator peer: ip: %s, port: %d", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); + break; + } + } + + if(peer == serverPeer) + { + Log::warn("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + subscribeDataMutex.unlock(); + return; + } + } subscribeDataMutex.unlock(); if(listenCallbackFunc) { - bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH); + bool continueListening = listenCallbackFunc(peer.get(), deserializer.getBuffer(), deserializer.getSize()); if(!continueListening) cancelListen({ pubsubKey }); } @@ -257,11 +278,12 @@ namespace sibs listen(pubsubKey, nullptr, false); } } - + + std::vector containedData((unsigned char*)data, (unsigned char*)data + size); auto message = std::make_shared(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); - message->append(data, size); - std::thread([this, pubsubKey, message]() + message->append(containedData.data(), containedData.size()); + std::thread([this, pubsubKey, message](std::vector containedData) { ++putThreadCount; std::unordered_set peersMessaged; @@ -294,7 +316,20 @@ namespace sibs { for(auto &peer : peersToMessage) { - connections.send(peer, message); + if(peer->routed) + { + auto routedMessage = std::make_shared(MessageType::DATA); + routedMessage->serializer.add((u16)peer->address.address.sin_family); + routedMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr); + routedMessage->serializer.add((u16)peer->address.address.sin_port); + routedMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); + routedMessage->append(containedData.data(), containedData.size()); + connections.send(serverPeer, routedMessage); + } + else + { + connections.send(peer, message); + } } peersToMessage.clear(); } @@ -307,7 +342,7 @@ namespace sibs std::this_thread::sleep_for(std::chrono::milliseconds(200)); } --putThreadCount; - }).detach(); + }, std::move(containedData)).detach(); return true; } @@ -327,6 +362,7 @@ namespace sibs message->append(listener.key.data.data(), listener.key.data.size()); connections.send(serverPeer, message); + // TODO: Unsubscribe to peers we are connected to indirectly (routed data) for(auto &peer : peersToMessage) { --peer->sharedKeys; -- cgit v1.2.3