diff options
author | dec05eba <0xdec05eba@gmail.com> | 2018-10-19 23:01:52 +0200 |
---|---|---|
committer | dec05eba <0xdec05eba@gmail.com> | 2018-10-19 23:03:20 +0200 |
commit | 54254462e432dcc6ef2bb306a9ee773d21314d19 (patch) | |
tree | 3334799426ba0186829de1bbffe0500a64331c31 | |
parent | 3565289c19974ca874f87429cc74a87558249c8e (diff) |
Retry put for 30 sec to wait for peer connections
-rw-r--r-- | include/sibs/BootstrapConnection.hpp | 18 | ||||
-rw-r--r-- | src/BootstrapConnection.cpp | 230 | ||||
-rw-r--r-- | src/BootstrapNode.cpp | 4 | ||||
-rw-r--r-- | src/DirectConnection.cpp | 45 | ||||
-rw-r--r-- | src/Log.cpp | 12 | ||||
-rw-r--r-- | tests/main.cpp | 8 |
6 files changed, 187 insertions, 130 deletions
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp index 9e2dfff..8156bd7 100644 --- a/include/sibs/BootstrapConnection.hpp +++ b/include/sibs/BootstrapConnection.hpp @@ -4,6 +4,7 @@ #include "DirectConnection.hpp" #include "PubsubKey.hpp" #include <mutex> +#include <atomic> namespace sibs { @@ -23,6 +24,13 @@ namespace sibs // Return false if you want to stop listening on the key using BoostrapConnectionListenCallbackFunc = std::function<bool(const DirectConnectionPeer *peer, const void *data, const usize size)>; + struct SubscribeData + { + BoostrapConnectionListenCallbackFunc listenCallbackFunc = nullptr; + std::vector<std::shared_ptr<DirectConnectionPeer>> peers; + i64 listenStartTimeMs = 0; + }; + struct ListenHandle { PubsubKey key; @@ -34,6 +42,7 @@ namespace sibs public: // Throws BootstrapConnectionException on error BootstrapConnection(const Ipv4 &bootstrapAddress); + ~BootstrapConnection(); // If we are already listening on the key @pubsubKey then the callback function is overwritten ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); @@ -46,14 +55,15 @@ namespace sibs std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers(); private: + ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc); void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size); void receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size); private: DirectConnections connections; std::shared_ptr<DirectConnectionPeer> serverPeer; - PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs; - PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers; - std::recursive_mutex listenerCallbackFuncMutex; - std::recursive_mutex subscribedPeersMutex; + PubsubKeyMap<SubscribeData> subscribeData; + std::recursive_mutex subscribeDataMutex; + bool alive; + std::atomic_int putThreadCount; }; } diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 35bb11e..f83b21c 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -1,22 +1,31 @@ #include "../include/sibs/BootstrapConnection.hpp" #include "../include/Log.hpp" #include <sibs/SafeDeserializer.hpp> +#include <chrono> +#include <unordered_set> + +namespace chrono = std::chrono; namespace sibs { - BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) + BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) : + alive(true), + putThreadCount(0) { connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer) { - std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex); - for(auto &topicUsers : subscribedPeers) + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + for(auto &it : subscribeData) { - for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); ) + for(auto subscribedPeersIt = it.second.peers.begin(), end = it.second.peers.end(); + subscribedPeersIt != end; + ++subscribedPeersIt) { - if(peer->address == (*it)->address) - it = topicUsers.second.erase(it); - else - ++it; + if(peer->address == (*subscribedPeersIt)->address) + { + it.second.peers.erase(subscribedPeersIt); + break; + } } } }); @@ -44,6 +53,15 @@ namespace sibs throw BootstrapConnectionException(errMsg); } } + + BootstrapConnection::~BootstrapConnection() + { + alive = false; + while(putThreadCount > 0) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size) @@ -60,12 +78,14 @@ namespace sibs deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); // we want lock to live this whole scope so we dont connect to peer when cancelListen is called - std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex); - auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); - if(listenerFuncIt == listenCallbackFuncs.end()) { - Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); - return; + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto subscribeDataIt = subscribeData.find(pubsubKey); + if(subscribeDataIt == subscribeData.end()) + { + Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + return; + } } while(!deserializer.empty()) @@ -85,8 +105,14 @@ namespace sibs { if(result == PubSubResult::OK) { - std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex); - subscribedPeers[pubsubKey].push_back(peer); + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto subscribeDataIt = subscribeData.find(pubsubKey); + if(subscribeDataIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + return; + } + subscribeDataIt->second.peers.push_back(peer); ++peer->sharedKeys; Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } @@ -111,16 +137,16 @@ namespace sibs memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); if(messageType == MessageType::DATA) { - listenerCallbackFuncMutex.lock(); - auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); - if(listenerFuncIt == listenCallbackFuncs.end()) + subscribeDataMutex.lock(); + auto listenerFuncIt = subscribeData.find(pubsubKey); + if(listenerFuncIt == subscribeData.end()) { - listenerCallbackFuncMutex.unlock(); + subscribeDataMutex.unlock(); Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } - auto listenCallbackFunc = listenerFuncIt->second; - listenerCallbackFuncMutex.unlock(); + auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc; + subscribeDataMutex.unlock(); if(listenCallbackFunc) { @@ -132,17 +158,17 @@ namespace sibs else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); - std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex); - auto peersListIt = subscribedPeers.find(pubsubKey); - if(peersListIt == subscribedPeers.end()) + std::lock_guard<std::recursive_mutex> subscribersMutex(subscribeDataMutex); + auto peersListIt = subscribeData.find(pubsubKey); + if(peersListIt == subscribeData.end()) return; - for(auto it = peersListIt->second.begin(); it != peersListIt->second.end(); ++it) + for(auto it = peersListIt->second.peers.begin(); it != peersListIt->second.peers.end(); ++it) { auto existingPeer = *it; if(*existingPeer == *peer) { - peersListIt->second.erase(it); + peersListIt->second.peers.erase(it); --peer->sharedKeys; if(peer->sharedKeys <= 0) connections.removePeer(peer->socket->udtSocket); @@ -155,23 +181,28 @@ namespace sibs Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe"); } } - - ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) + + ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc) { { - std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex); - auto it = listenCallbackFuncs.find(pubsubKey); - if(it != listenCallbackFuncs.end()) + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto it = subscribeData.find(pubsubKey); + if(it != subscribeData.end()) { Log::warn("BootstrapConnection::listen called on existing listener, overwriting callback function"); - it->second = callbackFunc; + if(registerCallbackFunc) + it->second.listenCallbackFunc = callbackFunc; return { pubsubKey }; } - listenCallbackFuncs[pubsubKey] = callbackFunc; + auto &data = subscribeData[pubsubKey]; + if(registerCallbackFunc) + data.listenCallbackFunc = callbackFunc; + data.listenStartTimeMs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count(); } auto message = std::make_shared<Message>(MessageType::SUBSCRIBE); message->append(pubsubKey.data.data(), pubsubKey.data.size()); + Log::debug("BootstrapConnection::listen: starting to listen to %s", pubsubKey.toString().c_str()); connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); @@ -180,81 +211,126 @@ namespace sibs return { pubsubKey }; } + ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) + { + return listen(pubsubKey, callbackFunc, true); + } + bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size) { if(size > 819200) // 800kb return false; { - std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex); - auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey); - if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second) - listenCallbackFuncIt->second(nullptr, data, size); + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto listenCallbackFuncIt = subscribeData.find(pubsubKey); + if(listenCallbackFuncIt != subscribeData.end() && listenCallbackFuncIt->second.listenCallbackFunc) + listenCallbackFuncIt->second.listenCallbackFunc(nullptr, data, size); - if(listenCallbackFuncIt == listenCallbackFuncs.end()) - Log::warn("BootstrapConnection::put on key '%s' which we are not listening to", pubsubKey.toString().c_str()); - } - - std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex); - auto peersIt = subscribedPeers.find(pubsubKey); - if(peersIt == subscribedPeers.end()) - { - Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); - return true; + if(listenCallbackFuncIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::put on key '%s' which we are not listening to, automatically listening now", pubsubKey.toString().c_str()); + listen(pubsubKey, nullptr, false); + } } auto message = std::make_shared<Message>(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); message->append(data, size); - for(auto &peer : peersIt->second) + std::thread([this, pubsubKey, message]() { - connections.send(peer, message); - } + ++putThreadCount; + std::unordered_set<int> peersMessaged; + std::vector<std::shared_ptr<DirectConnectionPeer>> peersToMessage; + i64 listenStartTimeMs = 0; + while(alive) + { + { + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto subscribeIt = subscribeData.find(pubsubKey); + if(subscribeIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + continue; + } + + listenStartTimeMs = subscribeIt->second.listenStartTimeMs; + + for(auto &peer : subscribeIt->second.peers) + { + if(peersMessaged.find(peer->socket->udtSocket) != peersMessaged.end()) + continue; + peersToMessage.push_back(peer); + peersMessaged.insert(peer->socket->udtSocket); + } + } + + if(!peersToMessage.empty()) + { + for(auto &peer : peersToMessage) + { + connections.send(peer, message); + } + peersToMessage.clear(); + } + + const i64 half_min_ms = 1000 * 30; + i64 now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count(); + if(now - listenStartTimeMs > half_min_ms) + break; + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + --putThreadCount; + }).detach(); return true; } bool BootstrapConnection::cancelListen(const ListenHandle &listener) { + std::vector<std::shared_ptr<DirectConnectionPeer>> peersToMessage; { - std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex); - auto it = listenCallbackFuncs.find(listener.key); - if(it == listenCallbackFuncs.end()) + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto it = subscribeData.find(listener.key); + if(it == subscribeData.end()) return false; - listenCallbackFuncs.erase(it); - auto message = std::make_shared<Message>(MessageType::UNSUBSCRIBE); - message->append(listener.key.data.data(), listener.key.data.size()); - connections.send(serverPeer, message); + peersToMessage = it->second.peers; + } - std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex); - auto peersListIt = subscribedPeers.find(listener.key); - // this will happen if there are no other peers subscribed to the key - if(peersListIt == subscribedPeers.end()) - return true; + auto message = std::make_shared<Message>(MessageType::UNSUBSCRIBE); + message->append(listener.key.data.data(), listener.key.data.size()); + connections.send(serverPeer, message); - for(auto &peer : peersListIt->second) + for(auto &peer : peersToMessage) + { + --peer->sharedKeys; + if(peer->sharedKeys <= 0) { - --peer->sharedKeys; - if(peer->sharedKeys <= 0) - { - // disconnect from peer - connections.removePeer(peer->socket->udtSocket); - } - else - { - // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore - connections.send(peer, message); - } + // disconnect from peer + connections.removePeer(peer->socket->udtSocket); } - subscribedPeers.erase(peersListIt); + else + { + // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore + connections.send(peer, message); + } + } + + { + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + auto it = subscribeData.find(listener.key); + if(it != subscribeData.end()) + subscribeData.erase(it); } return true; } bool BootstrapConnection::areWeListeningOnKey(const PubsubKey &pubsubKey) { - std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex); - return listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end(); + std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex); + return subscribeData.find(pubsubKey) != subscribeData.end(); } std::vector<std::shared_ptr<DirectConnectionPeer>> BootstrapConnection::getPeers() diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index 81df6d7..f6fc0c3 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -17,7 +17,7 @@ namespace sibs connections(address.getPort()), socket(connections.createSocket(address, false, true)) { - if(connections.port != address.getPort()) + if(address.getPort() == 0 || connections.port != address.getPort()) { throw SocketCreateException("BootstrapNode: Failed to bind port " + std::to_string(address.getPort())); } @@ -75,7 +75,7 @@ namespace sibs char clientHost[NI_MAXHOST]; char clientService[NI_MAXSERV]; getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV); - Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket); + Log::debug("BootstrapNode::acceptConnections: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket); std::lock_guard<std::mutex> lock(connections.peersMutex); UDT::epoll_add_usock(connections.eid, clientUdtSocket); diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 8034c50..010b8af 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -82,46 +82,19 @@ namespace sibs if(rendezvous || bind) { - if(reuseAddr) + Ipv4 myAddr = addressToBind; + for(int i = 0; i < 2000; ++i) { - /* - if(UDT::bind(socket, (sockaddr*)&addressToBind.address, sizeof(addressToBind.address)) == UDT::ERROR) + if(UDT::bind(udtSocket, (sockaddr*)&myAddr.address, sizeof(myAddr.address)) == UDT::ERROR) { - std::string errMsg = "UDT: Failed to bind, error: "; - errMsg += UDT::getlasterror_desc(); - throw SocketCreateException(errMsg); + port = (u16)generateRandomNumber(2000, 32000); + Log::warn("DirectConnections::createSocket: failed to bind socket to port %d, trying port %d. Fail reason: %s", myAddr.getPort(), port, UDT::getlasterror_desc()); + myAddr.address.sin_port = htons(port); } - */ - Ipv4 myAddr = addressToBind; - for(int i = 0; i < 2000; ++i) - { - if(UDT::bind(udtSocket, (sockaddr*)&myAddr.address, sizeof(myAddr.address)) == UDT::ERROR) - { - u16 newPort = (u16)generateRandomNumber(2000, 32000); - Log::warn("DirectConnections: failed to bind socket to port %d, trying port %d. Fail reason: %s", port, newPort, UDT::getlasterror_desc()); - port = newPort; - myAddr.address.sin_port = htons(port); - } - else - return socket; - } - throw SocketCreateException("UDT: Failed to bind after 2000 tries"); - } - else - { - Ipv4 myAddr = addressToBind; - for(int i = 0; i < 2000; ++i) - { - if(UDT::bind(udtSocket, (sockaddr*)&myAddr.address, sizeof(myAddr.address)) == UDT::ERROR) - { - port = (u16)generateRandomNumber(2000, 32000); - myAddr.address.sin_port = htons(port); - } - else - return socket; - } - throw SocketCreateException("UDT: Failed to bind after 2000 tries"); + else + return socket; } + throw SocketCreateException("UDT: Failed to bind after 2000 tries"); } return socket; diff --git a/src/Log.cpp b/src/Log.cpp index df424f0..a7593ec 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -13,9 +13,9 @@ namespace sibs std::lock_guard<std::mutex> lock(mutexLog); va_list args; va_start(args, fmt); - fputs("\033[1;32mDebug:\033[0m ", stdout); - vfprintf(stdout, fmt, args); - fputs("\n", stdout); + fputs("\033[1;32mDebug:\033[0m ", stderr); + vfprintf(stderr, fmt, args); + fputs("\n", stderr); va_end(args); } @@ -24,9 +24,9 @@ namespace sibs std::lock_guard<std::mutex> lock(mutexLog); va_list args; va_start(args, fmt); - fputs("\033[1;33mWarning:\033[0m ", stdout); - vfprintf(stdout, fmt, args); - fputs("\n", stdout); + fputs("\033[1;33mWarning:\033[0m ", stderr); + vfprintf(stderr, fmt, args); + fputs("\n", stderr); va_end(args); } diff --git a/tests/main.cpp b/tests/main.cpp index 2d1d505..d6c37e2 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -36,12 +36,10 @@ int main() gotAsdf2 = true; return true; }); - // wait until connection1 and connection2 receive each other as peers from bootstrap node - std::this_thread::sleep_for(std::chrono::seconds(3)); - + connection1.put(key, "hello", 5); - connection1.put(key, "asdf", 4); - std::this_thread::sleep_for(std::chrono::seconds(3)); + connection2.put(key, "asdf", 4); + std::this_thread::sleep_for(std::chrono::seconds(6)); REQUIRE(gotData1); REQUIRE(gotAsdf1); |