From 54254462e432dcc6ef2bb306a9ee773d21314d19 Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Fri, 19 Oct 2018 23:01:52 +0200 Subject: Retry put for 30 sec to wait for peer connections --- src/BootstrapConnection.cpp | 230 +++++++++++++++++++++++++++++--------------- 1 file changed, 153 insertions(+), 77 deletions(-) (limited to 'src/BootstrapConnection.cpp') diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 35bb11e..f83b21c 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -1,22 +1,31 @@ #include "../include/sibs/BootstrapConnection.hpp" #include "../include/Log.hpp" #include +#include +#include + +namespace chrono = std::chrono; namespace sibs { - BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) + BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) : + alive(true), + putThreadCount(0) { connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { - std::lock_guard lock(subscribedPeersMutex); - for(auto &topicUsers : subscribedPeers) + std::lock_guard lock(subscribeDataMutex); + for(auto &it : subscribeData) { - for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); ) + for(auto subscribedPeersIt = it.second.peers.begin(), end = it.second.peers.end(); + subscribedPeersIt != end; + ++subscribedPeersIt) { - if(peer->address == (*it)->address) - it = topicUsers.second.erase(it); - else - ++it; + if(peer->address == (*subscribedPeersIt)->address) + { + it.second.peers.erase(subscribedPeersIt); + break; + } } } }); @@ -44,6 +53,15 @@ namespace sibs throw BootstrapConnectionException(errMsg); } } + + BootstrapConnection::~BootstrapConnection() + { + alive = false; + while(putThreadCount > 0) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) @@ -60,12 +78,14 @@ namespace sibs deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); // we want lock to live this whole scope so we dont connect to peer when cancelListen is called - std::lock_guard lock(listenerCallbackFuncMutex); - auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); - if(listenerFuncIt == listenCallbackFuncs.end()) { - Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); - return; + std::lock_guard lock(subscribeDataMutex); + auto subscribeDataIt = subscribeData.find(pubsubKey); + if(subscribeDataIt == subscribeData.end()) + { + Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + return; + } } while(!deserializer.empty()) @@ -85,8 +105,14 @@ namespace sibs { if(result == PubSubResult::OK) { - std::lock_guard lock(subscribedPeersMutex); - subscribedPeers[pubsubKey].push_back(peer); + std::lock_guard lock(subscribeDataMutex); + auto subscribeDataIt = subscribeData.find(pubsubKey); + if(subscribeDataIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + return; + } + subscribeDataIt->second.peers.push_back(peer); ++peer->sharedKeys; Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } @@ -111,16 +137,16 @@ namespace sibs memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); if(messageType == MessageType::DATA) { - listenerCallbackFuncMutex.lock(); - auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); - if(listenerFuncIt == listenCallbackFuncs.end()) + subscribeDataMutex.lock(); + auto listenerFuncIt = subscribeData.find(pubsubKey); + if(listenerFuncIt == subscribeData.end()) { - listenerCallbackFuncMutex.unlock(); + subscribeDataMutex.unlock(); Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } - auto listenCallbackFunc = listenerFuncIt->second; - listenerCallbackFuncMutex.unlock(); + auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc; + subscribeDataMutex.unlock(); if(listenCallbackFunc) { @@ -132,17 +158,17 @@ namespace sibs else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); - std::lock_guard subscribersMutex(subscribedPeersMutex); - auto peersListIt = subscribedPeers.find(pubsubKey); - if(peersListIt == subscribedPeers.end()) + std::lock_guard subscribersMutex(subscribeDataMutex); + auto peersListIt = subscribeData.find(pubsubKey); + if(peersListIt == subscribeData.end()) return; - for(auto it = peersListIt->second.begin(); it != peersListIt->second.end(); ++it) + for(auto it = peersListIt->second.peers.begin(); it != peersListIt->second.peers.end(); ++it) { auto existingPeer = *it; if(*existingPeer == *peer) { - peersListIt->second.erase(it); + peersListIt->second.peers.erase(it); --peer->sharedKeys; if(peer->sharedKeys <= 0) connections.removePeer(peer->socket->udtSocket); @@ -155,23 +181,28 @@ namespace sibs Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe"); } } - - ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) + + ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc) { { - std::lock_guard lock(listenerCallbackFuncMutex); - auto it = listenCallbackFuncs.find(pubsubKey); - if(it != listenCallbackFuncs.end()) + std::lock_guard lock(subscribeDataMutex); + auto it = subscribeData.find(pubsubKey); + if(it != subscribeData.end()) { Log::warn("BootstrapConnection::listen called on existing listener, overwriting callback function"); - it->second = callbackFunc; + if(registerCallbackFunc) + it->second.listenCallbackFunc = callbackFunc; return { pubsubKey }; } - listenCallbackFuncs[pubsubKey] = callbackFunc; + auto &data = subscribeData[pubsubKey]; + if(registerCallbackFunc) + data.listenCallbackFunc = callbackFunc; + data.listenStartTimeMs = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); } auto message = std::make_shared(MessageType::SUBSCRIBE); message->append(pubsubKey.data.data(), pubsubKey.data.size()); + Log::debug("BootstrapConnection::listen: starting to listen to %s", pubsubKey.toString().c_str()); connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); @@ -180,81 +211,126 @@ namespace sibs return { pubsubKey }; } + ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) + { + return listen(pubsubKey, callbackFunc, true); + } + bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size) { if(size > 819200) // 800kb return false; { - std::lock_guard lock(listenerCallbackFuncMutex); - auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey); - if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second) - listenCallbackFuncIt->second(nullptr, data, size); + std::lock_guard lock(subscribeDataMutex); + auto listenCallbackFuncIt = subscribeData.find(pubsubKey); + if(listenCallbackFuncIt != subscribeData.end() && listenCallbackFuncIt->second.listenCallbackFunc) + listenCallbackFuncIt->second.listenCallbackFunc(nullptr, data, size); - if(listenCallbackFuncIt == listenCallbackFuncs.end()) - Log::warn("BootstrapConnection::put on key '%s' which we are not listening to", pubsubKey.toString().c_str()); - } - - std::lock_guard lock(subscribedPeersMutex); - auto peersIt = subscribedPeers.find(pubsubKey); - if(peersIt == subscribedPeers.end()) - { - Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); - return true; + if(listenCallbackFuncIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::put on key '%s' which we are not listening to, automatically listening now", pubsubKey.toString().c_str()); + listen(pubsubKey, nullptr, false); + } } auto message = std::make_shared(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); message->append(data, size); - for(auto &peer : peersIt->second) + std::thread([this, pubsubKey, message]() { - connections.send(peer, message); - } + ++putThreadCount; + std::unordered_set peersMessaged; + std::vector> peersToMessage; + i64 listenStartTimeMs = 0; + while(alive) + { + { + std::lock_guard lock(subscribeDataMutex); + auto subscribeIt = subscribeData.find(pubsubKey); + if(subscribeIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + continue; + } + + listenStartTimeMs = subscribeIt->second.listenStartTimeMs; + + for(auto &peer : subscribeIt->second.peers) + { + if(peersMessaged.find(peer->socket->udtSocket) != peersMessaged.end()) + continue; + peersToMessage.push_back(peer); + peersMessaged.insert(peer->socket->udtSocket); + } + } + + if(!peersToMessage.empty()) + { + for(auto &peer : peersToMessage) + { + connections.send(peer, message); + } + peersToMessage.clear(); + } + + const i64 half_min_ms = 1000 * 30; + i64 now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); + if(now - listenStartTimeMs > half_min_ms) + break; + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + --putThreadCount; + }).detach(); return true; } bool BootstrapConnection::cancelListen(const ListenHandle &listener) { + std::vector> peersToMessage; { - std::lock_guard lock(listenerCallbackFuncMutex); - auto it = listenCallbackFuncs.find(listener.key); - if(it == listenCallbackFuncs.end()) + std::lock_guard lock(subscribeDataMutex); + auto it = subscribeData.find(listener.key); + if(it == subscribeData.end()) return false; - listenCallbackFuncs.erase(it); - auto message = std::make_shared(MessageType::UNSUBSCRIBE); - message->append(listener.key.data.data(), listener.key.data.size()); - connections.send(serverPeer, message); + peersToMessage = it->second.peers; + } - std::lock_guard subscribersMutex(subscribedPeersMutex); - auto peersListIt = subscribedPeers.find(listener.key); - // this will happen if there are no other peers subscribed to the key - if(peersListIt == subscribedPeers.end()) - return true; + auto message = std::make_shared(MessageType::UNSUBSCRIBE); + message->append(listener.key.data.data(), listener.key.data.size()); + connections.send(serverPeer, message); - for(auto &peer : peersListIt->second) + for(auto &peer : peersToMessage) + { + --peer->sharedKeys; + if(peer->sharedKeys <= 0) { - --peer->sharedKeys; - if(peer->sharedKeys <= 0) - { - // disconnect from peer - connections.removePeer(peer->socket->udtSocket); - } - else - { - // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore - connections.send(peer, message); - } + // disconnect from peer + connections.removePeer(peer->socket->udtSocket); } - subscribedPeers.erase(peersListIt); + else + { + // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore + connections.send(peer, message); + } + } + + { + std::lock_guard lock(subscribeDataMutex); + auto it = subscribeData.find(listener.key); + if(it != subscribeData.end()) + subscribeData.erase(it); } return true; } bool BootstrapConnection::areWeListeningOnKey(const PubsubKey &pubsubKey) { - std::lock_guard lock(listenerCallbackFuncMutex); - return listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end(); + std::lock_guard lock(subscribeDataMutex); + return subscribeData.find(pubsubKey) != subscribeData.end(); } std::vector> BootstrapConnection::getPeers() -- cgit v1.2.3