aboutsummaryrefslogtreecommitdiff
path: root/src/DirectConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/DirectConnection.cpp')
-rw-r--r--src/DirectConnection.cpp68
1 files changed, 35 insertions, 33 deletions
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index f8ff0f2..8034c50 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -52,16 +52,16 @@ namespace sibs
DirectConnections::~DirectConnections()
{
alive = false;
+ receiveDataThread.join();
peers.clear();
UDT::epoll_release(eid);
UDT::cleanup();
- receiveDataThread.join();
}
std::unique_ptr<Socket> DirectConnections::createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind)
{
Log::debug("UDT: Creating socket for ipv4 address %s, port: %d", addressToBind.getAddress().c_str(), addressToBind.getPort());
- UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_STREAM, 0);
+ UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_DGRAM, 0);
if(udtSocket == UDT::INVALID_SOCK)
{
std::string errMsg = "UDT: Failed to create socket, error: ";
@@ -198,10 +198,12 @@ namespace sibs
if(data->getDataSize() > 819200) // 800kb
return false;
+ Log::debug("DirectConnections::send: sending %d bytes to %s:%d", data->getRawSize(), peer->address.getAddress().c_str(), peer->address.getPort());
// TODO: Replace this with light-weight threads (fibers)?
std::thread([peer, data, sendDataCallbackFunc]()
{
- int sentSize = UDT::send(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), 0);
+ const int one_min_ms = 1000 * 60;
+ int sentSize = UDT::sendmsg(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), one_min_ms, true);
if(sentSize == UDT::ERROR)
{
if(sendDataCallbackFunc)
@@ -285,6 +287,7 @@ namespace sibs
while(alive)
{
removeDisconnectedPeers();
+ readfds.clear();
int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 1000);
if(numfsReady == 0)
{
@@ -318,7 +321,7 @@ namespace sibs
peersMutex.unlock();
try
{
- Log::debug("DirectConnection: Received data from peer: (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ Log::debug("DirectConnections::receiveData: Received %d bytes from peer: (ip: %s, port: %d)", receivedTotalSize, peer->address.getAddress().c_str(), peer->address.getPort());
if(peer->receiveDataCallbackFunc && receivedTotalSize > 0)
{
static_assert(sizeof(MessageType) == sizeof(u8), "");
@@ -330,6 +333,10 @@ namespace sibs
{
Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what());
}
+ catch(...)
+ {
+ Log::error("UDT: Receive callback function threw exception, ignoring...");
+ }
}
else if(receivedDataStatus == CUDTException::EINVSOCK)
{
@@ -350,42 +357,37 @@ namespace sibs
Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer);
}
}
- readfds.clear();
}
}
int DirectConnections::receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize)
{
*receivedTotalSize = 0;
- while(*receivedTotalSize < MAX_RECEIVED_DATA_SIZE)
+ int dataAvailableSize;
+ int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
+ if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
{
- int dataAvailableSize;
- int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
- if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
- {
- Log::error("UDT: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
- return UDT::getlasterror_code();
- }
-
- if(dataAvailableSize == 0)
- return 0;
-
- int receivedSize = UDT::recv(socket, &output[*receivedTotalSize], MAX_RECEIVED_DATA_SIZE - *receivedTotalSize, 0);
- if(receivedSize == UDT::ERROR)
- {
- Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
- return UDT::getlasterror_code();
- }
-
- if(receivedSize == 0)
- {
- return 0;
- }
-
- (*receivedTotalSize) += receivedSize;
+ Log::error("DirectConnections::receiveDataFromPeer: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
+ return UDT::getlasterror_code();
+ }
+
+ if(dataAvailableSize == 0)
+ return 0;
+
+ if(dataAvailableSize > MAX_RECEIVED_DATA_SIZE)
+ {
+ Log::error("DirectConnections::receiveDataFromPeer: Received too much data, ignoring...");
+ return 0;
+ }
+
+ int receivedSize = UDT::recvmsg(socket, &output[0], MAX_RECEIVED_DATA_SIZE);
+ if(receivedSize == UDT::ERROR)
+ {
+ Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
+ return UDT::getlasterror_code();
}
- Log::error("UDT: Received too much data, ignoring...");
+ (*receivedTotalSize) = receivedSize;
return 0;
}
@@ -394,7 +396,7 @@ namespace sibs
sibs::SafeSerializer serializer;
for(const auto &it : peers)
{
- serializer.add((u32)it->address.address.sin_family);
+ serializer.add((u16)it->address.address.sin_family);
serializer.add((u32)it->address.address.sin_addr.s_addr);
serializer.add((u16)it->address.address.sin_port);
}
@@ -408,7 +410,7 @@ namespace sibs
while(!deserializer.empty())
{
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
- peer->address.address.sin_family = deserializer.extract<u32>();
+ peer->address.address.sin_family = deserializer.extract<u16>();
peer->address.address.sin_addr.s_addr = deserializer.extract<u32>();
peer->address.address.sin_port = deserializer.extract<u16>();
result.push_back(peer);