From 5d42dd6a18e3b8b6eb46739b8a1d15997e229de2 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 8 Jun 2018 04:16:48 +0200 Subject: Make connect asynchronous --- src/BootstrapConnection.cpp | 42 ++++++++++++++++++++++++++--- src/DirectConnection.cpp | 64 +++++++++++++++++++++++++++------------------ 2 files changed, 77 insertions(+), 29 deletions(-) (limited to 'src') diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 395e9e0..2920440 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -6,7 +6,28 @@ namespace sibs { BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) { - serverPeer = connections.connectServer(bootstrapAddress, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + PubSubResult connectResult = PubSubResult::OK; + std::string connectResultStr; + bool connected = false; + connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) + { + serverPeer = peer; + connectResult = result; + connectResultStr = resultStr; + connected = true; + }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + + while(!connected) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + if(connectResult != PubSubResult::OK) + { + std::string errMsg = "Failed to connect to bootstrap node, error: "; + errMsg += connectResultStr; + throw BootstrapConnectionException(errMsg); + } } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key @@ -27,7 +48,6 @@ namespace sibs auto listenerCallbackFunc = listenerFuncIt->second; listenerCallbackFuncMutex.unlock(); - auto &peers = subscribedPeers[pubsubKey]; while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); @@ -41,8 +61,17 @@ namespace sibs newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); // TODO: Move connection to thread and add callback function, just like @receiveData and @send - std::shared_ptr newPeer = connections.connect(newPeerAddress, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); - peers.push_back(newPeer); + connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr newPeer, PubSubResult result, const std::string &resultStr) + { + if(result == PubSubResult::OK) + { + subscribedPeersMutex.lock(); + subscribedPeers[pubsubKey].push_back(newPeer); + subscribedPeersMutex.unlock(); + } + else + Log::error("UDT: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); + }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } else Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); @@ -75,9 +104,14 @@ namespace sibs listenCallbackFuncIt->second(data->data(), data->size()); } + subscribedPeersMutex.lock(); auto peersIt = subscribedPeers.find(pubsubKey); if(peersIt == subscribedPeers.end()) + { + subscribedPeersMutex.unlock(); return; + } + subscribedPeersMutex.unlock(); for(auto &peer : peersIt->second) { diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 748a936..d10c990 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -47,7 +47,7 @@ namespace sibs { std::string errMsg = "UDT: Failed to create socket, error: "; errMsg += UDT::getlasterror_desc(); - throw ConnectionException(errMsg); + throw SocketCreateException(errMsg); } UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool)); @@ -64,43 +64,57 @@ namespace sibs { std::string errMsg = "UDT: Failed to bind, error: "; errMsg += UDT::getlasterror_desc(); - throw ConnectionException(errMsg); + throw SocketCreateException(errMsg); } return socket; } - std::shared_ptr DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) + void DirectConnections::connectServer(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc) { - return connect(address, false, true, receiveDataCallbackFunc); + connect(address, false, true, connectCallbackFunc, receiveDataCallbackFunc); } - std::shared_ptr DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) + void DirectConnections::connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc) { - return connect(address, true, true, receiveDataCallbackFunc); + connect(address, true, true, connectCallbackFunc, receiveDataCallbackFunc); } - std::shared_ptr DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc) + void DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc) { - UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr); - - if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) + std::thread([this, address, rendezvous, reuseAddr, connectCallbackFunc, receiveDataCallbackFunc]() { - UDT::close(socket); - std::string errMsg = "UDT: Failed to connect, error: "; - errMsg += UDT::getlasterror_desc(); - throw ConnectionException(errMsg); - } - - UDT::epoll_add_usock(eid, socket); - std::shared_ptr peer = std::make_shared(); - peer->socket = socket; - peer->address = address; - peer->receiveDataCallbackFunc = receiveDataCallbackFunc; - peersMutex.lock(); - peers[socket] = peer; - peersMutex.unlock(); - return peer; + std::shared_ptr peer = std::make_shared(); + UDTSOCKET socket; + try + { + socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr); + } + catch(SocketCreateException &e) + { + if(connectCallbackFunc) + connectCallbackFunc(peer, PubSubResult::ERROR, e.what()); + return; + } + + if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) + { + if(connectCallbackFunc) + connectCallbackFunc(peer, PubSubResult::ERROR, UDT::getlasterror_desc()); + return; + } + + UDT::epoll_add_usock(eid, socket); + peer->socket = socket; + peer->address = address; + peer->receiveDataCallbackFunc = receiveDataCallbackFunc; + peersMutex.lock(); + peers[socket] = peer; + peersMutex.unlock(); + + if(connectCallbackFunc) + connectCallbackFunc(peer, PubSubResult::OK, ""); + }).detach(); } void DirectConnections::send(const std::shared_ptr &peer, std::shared_ptr> data, PubSubSendDataCallback sendDataCallbackFunc) -- cgit v1.2.3