aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-06-08 04:16:48 +0200
committerdec05eba <dec05eba@protonmail.com>2020-08-18 22:56:48 +0200
commit5d42dd6a18e3b8b6eb46739b8a1d15997e229de2 (patch)
treefea25599cbaa650011489c2685d3299cdbdbd743 /src
parent824e669ddb12f1c30074c84eb731fa598f92ccfa (diff)
Make connect asynchronous
Diffstat (limited to 'src')
-rw-r--r--src/BootstrapConnection.cpp42
-rw-r--r--src/DirectConnection.cpp64
2 files changed, 77 insertions, 29 deletions
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 395e9e0..2920440 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -6,7 +6,28 @@ namespace sibs
{
BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress)
{
- serverPeer = connections.connectServer(bootstrapAddress, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ PubSubResult connectResult = PubSubResult::OK;
+ std::string connectResultStr;
+ bool connected = false;
+ connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)
+ {
+ serverPeer = peer;
+ connectResult = result;
+ connectResultStr = resultStr;
+ connected = true;
+ }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+
+ while(!connected)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+
+ if(connectResult != PubSubResult::OK)
+ {
+ std::string errMsg = "Failed to connect to bootstrap node, error: ";
+ errMsg += connectResultStr;
+ throw BootstrapConnectionException(errMsg);
+ }
}
// TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key
@@ -27,7 +48,6 @@ namespace sibs
auto listenerCallbackFunc = listenerFuncIt->second;
listenerCallbackFuncMutex.unlock();
- auto &peers = subscribedPeers[pubsubKey];
while(!deserializer.empty())
{
sa_family_t addressFamily = deserializer.extract<u32>();
@@ -41,8 +61,17 @@ namespace sibs
newPeerAddress.address.sin_port = port;
memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
// TODO: Move connection to thread and add callback function, just like @receiveData and @send
- std::shared_ptr<DirectConnectionPeer> newPeer = connections.connect(newPeerAddress, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- peers.push_back(newPeer);
+ connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr<DirectConnectionPeer> newPeer, PubSubResult result, const std::string &resultStr)
+ {
+ if(result == PubSubResult::OK)
+ {
+ subscribedPeersMutex.lock();
+ subscribedPeers[pubsubKey].push_back(newPeer);
+ subscribedPeersMutex.unlock();
+ }
+ else
+ Log::error("UDT: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str());
+ }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
}
else
Log::error("BootstrapConnection: Unknown address family: %d", addressFamily);
@@ -75,9 +104,14 @@ namespace sibs
listenCallbackFuncIt->second(data->data(), data->size());
}
+ subscribedPeersMutex.lock();
auto peersIt = subscribedPeers.find(pubsubKey);
if(peersIt == subscribedPeers.end())
+ {
+ subscribedPeersMutex.unlock();
return;
+ }
+ subscribedPeersMutex.unlock();
for(auto &peer : peersIt->second)
{
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 748a936..d10c990 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -47,7 +47,7 @@ namespace sibs
{
std::string errMsg = "UDT: Failed to create socket, error: ";
errMsg += UDT::getlasterror_desc();
- throw ConnectionException(errMsg);
+ throw SocketCreateException(errMsg);
}
UDT::setsockopt(socket, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));
@@ -64,43 +64,57 @@ namespace sibs
{
std::string errMsg = "UDT: Failed to bind, error: ";
errMsg += UDT::getlasterror_desc();
- throw ConnectionException(errMsg);
+ throw SocketCreateException(errMsg);
}
return socket;
}
- std::shared_ptr<DirectConnectionPeer> DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ void DirectConnections::connectServer(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- return connect(address, false, true, receiveDataCallbackFunc);
+ connect(address, false, true, connectCallbackFunc, receiveDataCallbackFunc);
}
- std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ void DirectConnections::connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- return connect(address, true, true, receiveDataCallbackFunc);
+ connect(address, true, true, connectCallbackFunc, receiveDataCallbackFunc);
}
- std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ void DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr);
-
- if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
+ std::thread([this, address, rendezvous, reuseAddr, connectCallbackFunc, receiveDataCallbackFunc]()
{
- UDT::close(socket);
- std::string errMsg = "UDT: Failed to connect, error: ";
- errMsg += UDT::getlasterror_desc();
- throw ConnectionException(errMsg);
- }
-
- UDT::epoll_add_usock(eid, socket);
- std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
- peer->socket = socket;
- peer->address = address;
- peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
- peersMutex.lock();
- peers[socket] = peer;
- peersMutex.unlock();
- return peer;
+ std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
+ UDTSOCKET socket;
+ try
+ {
+ socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr);
+ }
+ catch(SocketCreateException &e)
+ {
+ if(connectCallbackFunc)
+ connectCallbackFunc(peer, PubSubResult::ERROR, e.what());
+ return;
+ }
+
+ if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
+ {
+ if(connectCallbackFunc)
+ connectCallbackFunc(peer, PubSubResult::ERROR, UDT::getlasterror_desc());
+ return;
+ }
+
+ UDT::epoll_add_usock(eid, socket);
+ peer->socket = socket;
+ peer->address = address;
+ peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
+ peersMutex.lock();
+ peers[socket] = peer;
+ peersMutex.unlock();
+
+ if(connectCallbackFunc)
+ connectCallbackFunc(peer, PubSubResult::OK, "");
+ }).detach();
}
void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc)