From e66f3b1adec087dd7d47c6e25d26961d768ee3b8 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Sun, 25 Nov 2018 18:58:15 +0100 Subject: Fix routing of data --- src/BootstrapConnection.cpp | 31 +++++++++++++++---------------- src/DirectConnection.cpp | 7 +++++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 1e1ac43..df0a947 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -123,29 +123,28 @@ namespace sibs Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { - if(result == PubSubResult::RESULT_OK) + std::lock_guard lock(subscribeDataMutex); + auto subscribeDataIt = subscribeData.find(pubsubKey); + if(subscribeDataIt == subscribeData.end()) + { + Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + return; + } + + for(auto existingPeer : subscribeDataIt->second.peers) { - std::lock_guard lock(subscribeDataMutex); - auto subscribeDataIt = subscribeData.find(pubsubKey); - if(subscribeDataIt == subscribeData.end()) + if(peer == existingPeer) { - Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + Log::debug("BootstrapConnection::receiveDataFromServer: Already connected to peer (ip: %s, port: %d) for pubsub key %s", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); return; } + } - for(auto existingPeer : subscribeDataIt->second.peers) - { - if(peer == existingPeer) - { - Log::debug("BootstrapConnection::receiveDataFromServer: Already connected to peer (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); - return; - } - } - subscribeDataIt->second.peers.push_back(peer); + subscribeDataIt->second.peers.push_back(peer); + if(result == PubSubResult::RESULT_OK) Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); - } else - Log::error("BootstrapConnection::receiveDataFromServer: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); + Log::warn("BootstrapConnection::receiveDataFromServer: Failed to connect to peer given by bootstrap node, error: %s. Routing data through bootstrap node", resultStr.c_str()); }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } else diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index d92c972..5c072d5 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -171,7 +171,7 @@ namespace sibs peersMutex.unlock(); } - Log::debug("DirectConnections: Connecting to peer (ip: %s, port: %d, rendezvous: %s)", address.getAddress().c_str(), address.getPort(), rendezvous ? "yes" : "no"); + Log::debug("DirectConnections: Connecting to %s peer (ip: %s, port: %d, rendezvous: %s)", server ? "server" : "client", address.getAddress().c_str(), address.getPort(), rendezvous ? "yes" : "no"); if(UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) { if(connectCallbackFunc) @@ -373,7 +373,10 @@ namespace sibs if(removePeer(receivedDataFromPeer)) Log::debug("UDT: Removed peer socket %d", receivedDataFromPeer); else - Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer); + { + Log::error("UDT: Failed to remove peer socket %d, system said we got event from it but we are not connected to it", receivedDataFromPeer); + UDT::epoll_remove_usock(eid, receivedDataFromPeer); + } } } } -- cgit v1.2.3