From a14096f4aec0c0af285a6fcb0d368001666b8765 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 19 Oct 2018 14:19:12 +0200 Subject: Use sendmsg and recvmsg instead of send and recv to receive one message at a time --- src/DirectConnection.cpp | 68 +++++++++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 33 deletions(-) (limited to 'src/DirectConnection.cpp') diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index f8ff0f2..8034c50 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -52,16 +52,16 @@ namespace sibs DirectConnections::~DirectConnections() { alive = false; + receiveDataThread.join(); peers.clear(); UDT::epoll_release(eid); UDT::cleanup(); - receiveDataThread.join(); } std::unique_ptr DirectConnections::createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind) { Log::debug("UDT: Creating socket for ipv4 address %s, port: %d", addressToBind.getAddress().c_str(), addressToBind.getPort()); - UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_STREAM, 0); + UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_DGRAM, 0); if(udtSocket == UDT::INVALID_SOCK) { std::string errMsg = "UDT: Failed to create socket, error: "; @@ -198,10 +198,12 @@ namespace sibs if(data->getDataSize() > 819200) // 800kb return false; + Log::debug("DirectConnections::send: sending %d bytes to %s:%d", data->getRawSize(), peer->address.getAddress().c_str(), peer->address.getPort()); // TODO: Replace this with light-weight threads (fibers)? std::thread([peer, data, sendDataCallbackFunc]() { - int sentSize = UDT::send(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), 0); + const int one_min_ms = 1000 * 60; + int sentSize = UDT::sendmsg(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), one_min_ms, true); if(sentSize == UDT::ERROR) { if(sendDataCallbackFunc) @@ -285,6 +287,7 @@ namespace sibs while(alive) { removeDisconnectedPeers(); + readfds.clear(); int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 1000); if(numfsReady == 0) { @@ -318,7 +321,7 @@ namespace sibs peersMutex.unlock(); try { - Log::debug("DirectConnection: Received data from peer: (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + Log::debug("DirectConnections::receiveData: Received %d bytes from peer: (ip: %s, port: %d)", receivedTotalSize, peer->address.getAddress().c_str(), peer->address.getPort()); if(peer->receiveDataCallbackFunc && receivedTotalSize > 0) { static_assert(sizeof(MessageType) == sizeof(u8), ""); @@ -330,6 +333,10 @@ namespace sibs { Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what()); } + catch(...) + { + Log::error("UDT: Receive callback function threw exception, ignoring..."); + } } else if(receivedDataStatus == CUDTException::EINVSOCK) { @@ -350,42 +357,37 @@ namespace sibs Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer); } } - readfds.clear(); } } int DirectConnections::receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize) { *receivedTotalSize = 0; - while(*receivedTotalSize < MAX_RECEIVED_DATA_SIZE) + int dataAvailableSize; + int receiveSizeDataTypeSize = sizeof(dataAvailableSize); + if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) { - int dataAvailableSize; - int receiveSizeDataTypeSize = sizeof(dataAvailableSize); - if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) - { - Log::error("UDT: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); - return UDT::getlasterror_code(); - } - - if(dataAvailableSize == 0) - return 0; - - int receivedSize = UDT::recv(socket, &output[*receivedTotalSize], MAX_RECEIVED_DATA_SIZE - *receivedTotalSize, 0); - if(receivedSize == UDT::ERROR) - { - Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); - return UDT::getlasterror_code(); - } - - if(receivedSize == 0) - { - return 0; - } - - (*receivedTotalSize) += receivedSize; + Log::error("DirectConnections::receiveDataFromPeer: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); + return UDT::getlasterror_code(); + } + + if(dataAvailableSize == 0) + return 0; + + if(dataAvailableSize > MAX_RECEIVED_DATA_SIZE) + { + Log::error("DirectConnections::receiveDataFromPeer: Received too much data, ignoring..."); + return 0; + } + + int receivedSize = UDT::recvmsg(socket, &output[0], MAX_RECEIVED_DATA_SIZE); + if(receivedSize == UDT::ERROR) + { + Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); + return UDT::getlasterror_code(); } - Log::error("UDT: Received too much data, ignoring..."); + (*receivedTotalSize) = receivedSize; return 0; } @@ -394,7 +396,7 @@ namespace sibs sibs::SafeSerializer serializer; for(const auto &it : peers) { - serializer.add((u32)it->address.address.sin_family); + serializer.add((u16)it->address.address.sin_family); serializer.add((u32)it->address.address.sin_addr.s_addr); serializer.add((u16)it->address.address.sin_port); } @@ -408,7 +410,7 @@ namespace sibs while(!deserializer.empty()) { std::shared_ptr peer = std::make_shared(); - peer->address.address.sin_family = deserializer.extract(); + peer->address.address.sin_family = deserializer.extract(); peer->address.address.sin_addr.s_addr = deserializer.extract(); peer->address.address.sin_port = deserializer.extract(); result.push_back(peer); -- cgit v1.2.3