From eda94456add9a65d1821302e343bef4021d2a773 Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Tue, 16 Oct 2018 00:37:21 +0200 Subject: Reuse peer connection if subscribed to same key --- src/BootstrapConnection.cpp | 165 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 138 insertions(+), 27 deletions(-) (limited to 'src/BootstrapConnection.cpp') diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index e5f5178..0237a90 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -8,7 +8,7 @@ namespace sibs { connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { - std::lock_guard lock(subscribedPeersMutex); + std::lock_guard lock(subscribedPeersMutex); for(auto &topicUsers : subscribedPeers) { for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); ) @@ -30,7 +30,7 @@ namespace sibs connectResult = result; connectResultStr = resultStr; connected = true; - }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); while(!connected) { @@ -46,23 +46,27 @@ namespace sibs } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key - void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, const void *data, const usize size) + void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { + if(messageType != MessageType::SUBSCRIBE) + { + Log::warn("BootstrapConnection: received message from server that was not subscribe"); + return; + } + Log::debug("BootstrapConnection: Received subscriber(s) from bootstrap node"); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); - listenerCallbackFuncMutex.lock(); + // we want lock to live this whole scope so we dont connect to peer when cancelListen is called + std::lock_guard lock(listenerCallbackFuncMutex); auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenerFuncIt == listenCallbackFuncs.end()) { - Log::debug("BoostrapConnection: No listener found for key XXX, ignoring..."); - listenerCallbackFuncMutex.unlock(); + Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } - auto listenerCallbackFunc = listenerFuncIt->second; - listenerCallbackFuncMutex.unlock(); while(!deserializer.empty()) { @@ -76,18 +80,19 @@ namespace sibs newPeerAddress.address.sin_addr.s_addr = ipv4Address; newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); - connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr newPeer, PubSubResult result, const std::string &resultStr) + Log::debug("BootstrapConnection: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); + connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { if(result == PubSubResult::OK) { - subscribedPeersMutex.lock(); - subscribedPeers[pubsubKey].push_back(newPeer); - subscribedPeersMutex.unlock(); - Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", newPeer->address.getAddress().c_str(), newPeer->address.getPort()); + std::lock_guard lock(subscribedPeersMutex); + subscribedPeers[pubsubKey].push_back(peer); + ++peer->sharedKeys; + Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } else Log::error("BootstrapConnection: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); - }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } else { @@ -97,46 +102,152 @@ namespace sibs } } - void BootstrapConnection::receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr peer, const void *data, const usize size) + void BootstrapConnection::receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { - if(listenCallbackFunc) - listenCallbackFunc(peer.get(), data, size); + if(size < PUBSUB_KEY_LENGTH) + return; + + PubsubKey pubsubKey; + memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); + if(messageType == MessageType::DATA) + { + listenerCallbackFuncMutex.lock(); + auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); + if(listenerFuncIt == listenCallbackFuncs.end()) + { + listenerCallbackFuncMutex.unlock(); + Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + return; + } + auto listenCallbackFunc = listenerFuncIt->second; + listenerCallbackFuncMutex.unlock(); + + if(listenCallbackFunc) + { + bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH); + if(!continueListening) + cancelListen({ pubsubKey }); + } + } + else if(messageType == MessageType::UNSUBSCRIBE) + { + Log::debug("BootstrapConnection: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); + std::lock_guard subscribersMutex(subscribedPeersMutex); + auto peersListIt = subscribedPeers.find(pubsubKey); + if(peersListIt == subscribedPeers.end()) + return; + + for(auto it = peersListIt->second.begin(); it != peersListIt->second.end(); ++it) + { + auto existingPeer = *it; + if(*existingPeer == *peer) + { + peersListIt->second.erase(it); + --peer->sharedKeys; + if(peer->sharedKeys <= 0) + connections.removePeer(peer->socket->udtSocket); + break; + } + } + } + else + { + Log::warn("BootstrapConnection: received message from peer that was not data or unsubscribe"); + } } - void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) + ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) { { - std::lock_guard lock(listenerCallbackFuncMutex); + std::lock_guard lock(listenerCallbackFuncMutex); if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end()) throw PubsubKeyAlreadyListeningException(""); listenCallbackFuncs[pubsubKey] = callbackFunc; } - connections.send(serverPeer, std::make_shared>(pubsubKey.data.begin(), pubsubKey.data.end()), - [](PubSubResult result, const std::string &resultStr) + + auto message = std::make_shared(MessageType::SUBSCRIBE); + message->append(pubsubKey.data.data(), pubsubKey.data.size()); + connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); }); + + return { pubsubKey }; } - void BootstrapConnection::put(const PubsubKey &pubsubKey, std::shared_ptr> data) + bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size) { + if(size > 819200) // 800kb + return false; + { - std::lock_guard lock(listenerCallbackFuncMutex); + std::lock_guard lock(listenerCallbackFuncMutex); auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second) - listenCallbackFuncIt->second(nullptr, data->data(), data->size()); + listenCallbackFuncIt->second(nullptr, data, size); + + if(listenCallbackFuncIt == listenCallbackFuncs.end()) + Log::warn("BootstrapConnection::put on key '%s' which we are not listening to", pubsubKey.toString().c_str()); } - std::lock_guard lock(subscribedPeersMutex); + std::lock_guard lock(subscribedPeersMutex); auto peersIt = subscribedPeers.find(pubsubKey); if(peersIt == subscribedPeers.end()) { - return; + Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); + return true; } + auto message = std::make_shared(MessageType::DATA); + message->append(pubsubKey.data.data(), pubsubKey.data.size()); + message->append(data, size); for(auto &peer : peersIt->second) { - connections.send(peer, data); + connections.send(peer, message); + } + return true; + } + + bool BootstrapConnection::cancelListen(const ListenHandle &listener) + { + { + std::lock_guard lock(listenerCallbackFuncMutex); + auto it = listenCallbackFuncs.find(listener.key); + if(it == listenCallbackFuncs.end()) + return false; + listenCallbackFuncs.erase(it); + + auto message = std::make_shared(MessageType::UNSUBSCRIBE); + message->append(listener.key.data.data(), listener.key.data.size()); + connections.send(serverPeer, message); + + std::lock_guard subscribersMutex(subscribedPeersMutex); + auto peersListIt = subscribedPeers.find(listener.key); + // this will happen if there are no other peers subscribed to the key + if(peersListIt == subscribedPeers.end()) + return true; + + for(auto &peer : peersListIt->second) + { + --peer->sharedKeys; + if(peer->sharedKeys <= 0) + { + // disconnect from peer + connections.removePeer(peer->socket->udtSocket); + } + else + { + // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore + connections.send(peer, message); + } + } + subscribedPeers.erase(peersListIt); } + return true; + } + + std::vector> BootstrapConnection::getPeers() + { + return connections.getPeers(); } } -- cgit v1.2.3