From e36509911c34a376c04498d47016002b1badbead Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Fri, 8 Jun 2018 04:16:48 +0200 Subject: Make connect asynchronous --- src/BootstrapConnection.cpp | 42 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) (limited to 'src/BootstrapConnection.cpp') diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 395e9e0..2920440 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -6,7 +6,28 @@ namespace sibs { BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) { - serverPeer = connections.connectServer(bootstrapAddress, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + PubSubResult connectResult = PubSubResult::OK; + std::string connectResultStr; + bool connected = false; + connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) + { + serverPeer = peer; + connectResult = result; + connectResultStr = resultStr; + connected = true; + }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + + while(!connected) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + if(connectResult != PubSubResult::OK) + { + std::string errMsg = "Failed to connect to bootstrap node, error: "; + errMsg += connectResultStr; + throw BootstrapConnectionException(errMsg); + } } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key @@ -27,7 +48,6 @@ namespace sibs auto listenerCallbackFunc = listenerFuncIt->second; listenerCallbackFuncMutex.unlock(); - auto &peers = subscribedPeers[pubsubKey]; while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); @@ -41,8 +61,17 @@ namespace sibs newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); // TODO: Move connection to thread and add callback function, just like @receiveData and @send - std::shared_ptr newPeer = connections.connect(newPeerAddress, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); - peers.push_back(newPeer); + connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr newPeer, PubSubResult result, const std::string &resultStr) + { + if(result == PubSubResult::OK) + { + subscribedPeersMutex.lock(); + subscribedPeers[pubsubKey].push_back(newPeer); + subscribedPeersMutex.unlock(); + } + else + Log::error("UDT: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); + }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } else Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); @@ -75,9 +104,14 @@ namespace sibs listenCallbackFuncIt->second(data->data(), data->size()); } + subscribedPeersMutex.lock(); auto peersIt = subscribedPeers.find(pubsubKey); if(peersIt == subscribedPeers.end()) + { + subscribedPeersMutex.unlock(); return; + } + subscribedPeersMutex.unlock(); for(auto &peer : peersIt->second) { -- cgit v1.2.3