From a14096f4aec0c0af285a6fcb0d368001666b8765 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 19 Oct 2018 14:19:12 +0200 Subject: Use sendmsg and recvmsg instead of send and recv to receive one message at a time --- src/BootstrapConnection.cpp | 22 +++++++-------- src/BootstrapNode.cpp | 7 ++--- src/DirectConnection.cpp | 68 +++++++++++++++++++++++---------------------- tests/main.cpp | 13 +++++++-- 4 files changed, 60 insertions(+), 50 deletions(-) diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 995fdcd..35bb11e 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -50,11 +50,11 @@ namespace sibs { if(messageType != MessageType::SUBSCRIBE) { - Log::warn("BootstrapConnection: received message from server that was not subscribe"); + Log::warn("BootstrapConnection::receiveDataFromServer: received message from server that was not subscribe"); return; } - Log::debug("BootstrapConnection: Received subscriber(s) from bootstrap node"); + Log::debug("BootstrapConnection::receiveDataFromServer: Received subscriber(s) from bootstrap node"); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); @@ -64,13 +64,13 @@ namespace sibs auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenerFuncIt == listenCallbackFuncs.end()) { - Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } while(!deserializer.empty()) { - sa_family_t addressFamily = deserializer.extract(); + sa_family_t addressFamily = deserializer.extract(); if(addressFamily == AF_INET) { in_addr_t ipv4Address = deserializer.extract(); @@ -80,7 +80,7 @@ namespace sibs newPeerAddress.address.sin_addr.s_addr = ipv4Address; newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); - Log::debug("BootstrapConnection: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); + Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { if(result == PubSubResult::OK) @@ -88,15 +88,15 @@ namespace sibs std::lock_guard lock(subscribedPeersMutex); subscribedPeers[pubsubKey].push_back(peer); ++peer->sharedKeys; - Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); + Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } else - Log::error("BootstrapConnection: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); + Log::error("BootstrapConnection::receiveDataFromServer: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } else { - Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); + Log::error("BootstrapConnection::receiveDataFromServer: Unknown address family: %d", addressFamily); return; } } @@ -116,7 +116,7 @@ namespace sibs if(listenerFuncIt == listenCallbackFuncs.end()) { listenerCallbackFuncMutex.unlock(); - Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } auto listenCallbackFunc = listenerFuncIt->second; @@ -131,7 +131,7 @@ namespace sibs } else if(messageType == MessageType::UNSUBSCRIBE) { - Log::debug("BootstrapConnection: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); + Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); std::lock_guard subscribersMutex(subscribedPeersMutex); auto peersListIt = subscribedPeers.find(pubsubKey); if(peersListIt == subscribedPeers.end()) @@ -152,7 +152,7 @@ namespace sibs } else { - Log::warn("BootstrapConnection: received message from peer that was not data or unsubscribe"); + Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe"); } } diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index 5c62e64..81df6d7 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -48,7 +48,6 @@ namespace sibs BootstrapNode::~BootstrapNode() { - std::lock_guard lock(connections.peersMutex); connections.alive = false; socket.reset(); acceptConnectionsThread.join(); @@ -94,7 +93,7 @@ namespace sibs { sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; - deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); if(messageType == MessageType::SUBSCRIBE) { @@ -108,7 +107,7 @@ namespace sibs } sibs::SafeSerializer serializer; - serializer.add((u32)peer->address.address.sin_family); + serializer.add((u16)peer->address.address.sin_family); serializer.add((u32)peer->address.address.sin_addr.s_addr); serializer.add((u16)peer->address.address.sin_port); auto newPeerMessage = std::make_shared(MessageType::SUBSCRIBE); @@ -126,7 +125,7 @@ namespace sibs sibs::SafeSerializer newPeerSerializer; for(auto &existingPeer : peers) { - newPeerSerializer.add((u32)existingPeer->address.address.sin_family); + newPeerSerializer.add((u16)existingPeer->address.address.sin_family); newPeerSerializer.add((u32)existingPeer->address.address.sin_addr.s_addr); newPeerSerializer.add((u16)existingPeer->address.address.sin_port); } diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index f8ff0f2..8034c50 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -52,16 +52,16 @@ namespace sibs DirectConnections::~DirectConnections() { alive = false; + receiveDataThread.join(); peers.clear(); UDT::epoll_release(eid); UDT::cleanup(); - receiveDataThread.join(); } std::unique_ptr DirectConnections::createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind) { Log::debug("UDT: Creating socket for ipv4 address %s, port: %d", addressToBind.getAddress().c_str(), addressToBind.getPort()); - UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_STREAM, 0); + UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_DGRAM, 0); if(udtSocket == UDT::INVALID_SOCK) { std::string errMsg = "UDT: Failed to create socket, error: "; @@ -198,10 +198,12 @@ namespace sibs if(data->getDataSize() > 819200) // 800kb return false; + Log::debug("DirectConnections::send: sending %d bytes to %s:%d", data->getRawSize(), peer->address.getAddress().c_str(), peer->address.getPort()); // TODO: Replace this with light-weight threads (fibers)? std::thread([peer, data, sendDataCallbackFunc]() { - int sentSize = UDT::send(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), 0); + const int one_min_ms = 1000 * 60; + int sentSize = UDT::sendmsg(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), one_min_ms, true); if(sentSize == UDT::ERROR) { if(sendDataCallbackFunc) @@ -285,6 +287,7 @@ namespace sibs while(alive) { removeDisconnectedPeers(); + readfds.clear(); int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 1000); if(numfsReady == 0) { @@ -318,7 +321,7 @@ namespace sibs peersMutex.unlock(); try { - Log::debug("DirectConnection: Received data from peer: (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + Log::debug("DirectConnections::receiveData: Received %d bytes from peer: (ip: %s, port: %d)", receivedTotalSize, peer->address.getAddress().c_str(), peer->address.getPort()); if(peer->receiveDataCallbackFunc && receivedTotalSize > 0) { static_assert(sizeof(MessageType) == sizeof(u8), ""); @@ -330,6 +333,10 @@ namespace sibs { Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what()); } + catch(...) + { + Log::error("UDT: Receive callback function threw exception, ignoring..."); + } } else if(receivedDataStatus == CUDTException::EINVSOCK) { @@ -350,42 +357,37 @@ namespace sibs Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer); } } - readfds.clear(); } } int DirectConnections::receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize) { *receivedTotalSize = 0; - while(*receivedTotalSize < MAX_RECEIVED_DATA_SIZE) + int dataAvailableSize; + int receiveSizeDataTypeSize = sizeof(dataAvailableSize); + if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) { - int dataAvailableSize; - int receiveSizeDataTypeSize = sizeof(dataAvailableSize); - if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) - { - Log::error("UDT: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); - return UDT::getlasterror_code(); - } - - if(dataAvailableSize == 0) - return 0; - - int receivedSize = UDT::recv(socket, &output[*receivedTotalSize], MAX_RECEIVED_DATA_SIZE - *receivedTotalSize, 0); - if(receivedSize == UDT::ERROR) - { - Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); - return UDT::getlasterror_code(); - } - - if(receivedSize == 0) - { - return 0; - } - - (*receivedTotalSize) += receivedSize; + Log::error("DirectConnections::receiveDataFromPeer: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); + return UDT::getlasterror_code(); + } + + if(dataAvailableSize == 0) + return 0; + + if(dataAvailableSize > MAX_RECEIVED_DATA_SIZE) + { + Log::error("DirectConnections::receiveDataFromPeer: Received too much data, ignoring..."); + return 0; + } + + int receivedSize = UDT::recvmsg(socket, &output[0], MAX_RECEIVED_DATA_SIZE); + if(receivedSize == UDT::ERROR) + { + Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); + return UDT::getlasterror_code(); } - Log::error("UDT: Received too much data, ignoring..."); + (*receivedTotalSize) = receivedSize; return 0; } @@ -394,7 +396,7 @@ namespace sibs sibs::SafeSerializer serializer; for(const auto &it : peers) { - serializer.add((u32)it->address.address.sin_family); + serializer.add((u16)it->address.address.sin_family); serializer.add((u32)it->address.address.sin_addr.s_addr); serializer.add((u16)it->address.address.sin_port); } @@ -408,7 +410,7 @@ namespace sibs while(!deserializer.empty()) { std::shared_ptr peer = std::make_shared(); - peer->address.address.sin_family = deserializer.extract(); + peer->address.address.sin_family = deserializer.extract(); peer->address.address.sin_addr.s_addr = deserializer.extract(); peer->address.address.sin_port = deserializer.extract(); result.push_back(peer); diff --git a/tests/main.cpp b/tests/main.cpp index d650128..2d1d505 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -15,29 +15,38 @@ int main() sibs::BootstrapConnection connection1(sibs::Ipv4("127.0.0.1", PORT)); bool gotData1 = false; - connection1.listen(key, [&gotData1](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) + bool gotAsdf1 = false; + connection1.listen(key, [&gotData1, &gotAsdf1](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) { if(size == 5 && strncmp((const char*)data, "hello", 5) == 0) gotData1 = true; + if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0) + gotAsdf1 = true; return true; }); sibs::BootstrapConnection connection2(sibs::Ipv4("127.0.0.1", PORT)); bool gotData2 = false; - connection2.listen(key, [&gotData2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) + bool gotAsdf2 = false; + connection2.listen(key, [&gotData2, &gotAsdf2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) { if(size == 5 && strncmp((const char*)data, "hello", 5) == 0) gotData2 = true; + if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0) + gotAsdf2 = true; return true; }); // wait until connection1 and connection2 receive each other as peers from bootstrap node std::this_thread::sleep_for(std::chrono::seconds(3)); connection1.put(key, "hello", 5); + connection1.put(key, "asdf", 4); std::this_thread::sleep_for(std::chrono::seconds(3)); REQUIRE(gotData1); + REQUIRE(gotAsdf1); REQUIRE(gotData2); + REQUIRE(gotAsdf2); return 0; } -- cgit v1.2.3