From 214150db03411d90d6aee4044386d9f671304f3e Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Thu, 7 Jun 2018 22:00:42 +0200 Subject: Add bootstrap node, listen method --- src/DirectConnection.cpp | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) (limited to 'src/DirectConnection.cpp') diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 86b1aa3..6d9fe27 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -70,9 +70,19 @@ namespace sibs return socket; } + std::shared_ptr DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) + { + return connect(address, false, true, receiveDataCallbackFunc); + } + std::shared_ptr DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc) { - UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true); + return connect(address, true, true, receiveDataCallbackFunc); + } + + std::shared_ptr DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc) + { + UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr); if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) { @@ -85,6 +95,7 @@ namespace sibs UDT::epoll_add_usock(eid, socket); std::shared_ptr peer = std::make_shared(); peer->socket = socket; + peer->address = address; peer->receiveDataCallbackFunc = receiveDataCallbackFunc; peersMutex.lock(); peers[socket] = peer; @@ -92,20 +103,24 @@ namespace sibs return peer; } - void DirectConnections::send(const std::shared_ptr &peer, const void *data, const usize size) + void DirectConnections::send(const std::shared_ptr &peer, std::shared_ptr> data, PubSubSendDataCallback sendDataCallbackFunc) { - usize sentSizeTotal = 0; - while(sentSizeTotal < size) + // TODO: Replace this with light-weight threads (fibers)? + std::thread([peer, data, sendDataCallbackFunc]() { - int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0); - if(sentSize == UDT::ERROR) + usize sentSizeTotal = 0; + while(sentSizeTotal < data->size()) { - std::string errMsg = "UDT: Failed to send data, error: "; - errMsg += UDT::getlasterror_desc(); - throw SendException(errMsg); + int sentSize = UDT::send(peer->socket, (char*)data->data() + sentSizeTotal, data->size() - sentSizeTotal, 0); + if(sentSize == UDT::ERROR) + { + if(sendDataCallbackFunc) + sendDataCallbackFunc(PubSubResult::ERROR, UDT::getlasterror_desc()); + } + sentSizeTotal += sentSize; } - sentSizeTotal += sentSize; - } + sendDataCallbackFunc(PubSubResult::OK, ""); + }).detach(); } void DirectConnections::receiveData() @@ -141,7 +156,7 @@ namespace sibs try { if(peer->receiveDataCallbackFunc) - peer->receiveDataCallbackFunc(data.data(), data.size()); + peer->receiveDataCallbackFunc(peer, data.data(), data.size()); } catch(std::exception &e) { -- cgit v1.2.3