aboutsummaryrefslogtreecommitdiff
path: root/src/DirectConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/DirectConnection.cpp')
-rw-r--r--src/DirectConnection.cpp39
1 files changed, 27 insertions, 12 deletions
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 86b1aa3..6d9fe27 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -70,9 +70,19 @@ namespace sibs
return socket;
}
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connectServer(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ return connect(address, false, true, receiveDataCallbackFunc);
+ }
+
std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, PubSubReceiveDataCallback receiveDataCallbackFunc)
{
- UDTSOCKET socket = createSocket(Ipv4(nullptr, port), true, true);
+ return connect(address, true, true, receiveDataCallbackFunc);
+ }
+
+ std::shared_ptr<DirectConnectionPeer> DirectConnections::connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc)
+ {
+ UDTSOCKET socket = createSocket(Ipv4(nullptr, port), rendezvous, reuseAddr);
if(UDT::connect(socket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
{
@@ -85,6 +95,7 @@ namespace sibs
UDT::epoll_add_usock(eid, socket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
peer->socket = socket;
+ peer->address = address;
peer->receiveDataCallbackFunc = receiveDataCallbackFunc;
peersMutex.lock();
peers[socket] = peer;
@@ -92,20 +103,24 @@ namespace sibs
return peer;
}
- void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, const void *data, const usize size)
+ void DirectConnections::send(const std::shared_ptr<DirectConnectionPeer> &peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc)
{
- usize sentSizeTotal = 0;
- while(sentSizeTotal < size)
+ // TODO: Replace this with light-weight threads (fibers)?
+ std::thread([peer, data, sendDataCallbackFunc]()
{
- int sentSize = UDT::send(peer->socket, (char*)data + sentSizeTotal, size - sentSizeTotal, 0);
- if(sentSize == UDT::ERROR)
+ usize sentSizeTotal = 0;
+ while(sentSizeTotal < data->size())
{
- std::string errMsg = "UDT: Failed to send data, error: ";
- errMsg += UDT::getlasterror_desc();
- throw SendException(errMsg);
+ int sentSize = UDT::send(peer->socket, (char*)data->data() + sentSizeTotal, data->size() - sentSizeTotal, 0);
+ if(sentSize == UDT::ERROR)
+ {
+ if(sendDataCallbackFunc)
+ sendDataCallbackFunc(PubSubResult::ERROR, UDT::getlasterror_desc());
+ }
+ sentSizeTotal += sentSize;
}
- sentSizeTotal += sentSize;
- }
+ sendDataCallbackFunc(PubSubResult::OK, "");
+ }).detach();
}
void DirectConnections::receiveData()
@@ -141,7 +156,7 @@ namespace sibs
try
{
if(peer->receiveDataCallbackFunc)
- peer->receiveDataCallbackFunc(data.data(), data.size());
+ peer->receiveDataCallbackFunc(peer, data.data(), data.size());
}
catch(std::exception &e)
{