aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-10-19 14:19:12 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-10-19 14:19:15 +0200
commit3565289c19974ca874f87429cc74a87558249c8e (patch)
treef3afeb061700e5e168dbee02e7694e3bc97be977
parenta4e128e5e46c9fcf00fd874c8386988f9d3b73fa (diff)
Use sendmsg and recvmsg instead of send and recv to receive one message at a time
-rw-r--r--src/BootstrapConnection.cpp22
-rw-r--r--src/BootstrapNode.cpp7
-rw-r--r--src/DirectConnection.cpp68
-rw-r--r--tests/main.cpp13
4 files changed, 60 insertions, 50 deletions
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 995fdcd..35bb11e 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -50,11 +50,11 @@ namespace sibs
{
if(messageType != MessageType::SUBSCRIBE)
{
- Log::warn("BootstrapConnection: received message from server that was not subscribe");
+ Log::warn("BootstrapConnection::receiveDataFromServer: received message from server that was not subscribe");
return;
}
- Log::debug("BootstrapConnection: Received subscriber(s) from bootstrap node");
+ Log::debug("BootstrapConnection::receiveDataFromServer: Received subscriber(s) from bootstrap node");
sibs::SafeDeserializer deserializer((const u8*)data, size);
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
@@ -64,13 +64,13 @@ namespace sibs
auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
if(listenerFuncIt == listenCallbackFuncs.end())
{
- Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
return;
}
while(!deserializer.empty())
{
- sa_family_t addressFamily = deserializer.extract<u32>();
+ sa_family_t addressFamily = deserializer.extract<u16>();
if(addressFamily == AF_INET)
{
in_addr_t ipv4Address = deserializer.extract<u32>();
@@ -80,7 +80,7 @@ namespace sibs
newPeerAddress.address.sin_addr.s_addr = ipv4Address;
newPeerAddress.address.sin_port = port;
memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
- Log::debug("BootstrapConnection: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort());
+ Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort());
connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)
{
if(result == PubSubResult::OK)
@@ -88,15 +88,15 @@ namespace sibs
std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
subscribedPeers[pubsubKey].push_back(peer);
++peer->sharedKeys;
- Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort());
+ Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort());
}
else
- Log::error("BootstrapConnection: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str());
+ Log::error("BootstrapConnection::receiveDataFromServer: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str());
}, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
}
else
{
- Log::error("BootstrapConnection: Unknown address family: %d", addressFamily);
+ Log::error("BootstrapConnection::receiveDataFromServer: Unknown address family: %d", addressFamily);
return;
}
}
@@ -116,7 +116,7 @@ namespace sibs
if(listenerFuncIt == listenCallbackFuncs.end())
{
listenerCallbackFuncMutex.unlock();
- Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
return;
}
auto listenCallbackFunc = listenerFuncIt->second;
@@ -131,7 +131,7 @@ namespace sibs
}
else if(messageType == MessageType::UNSUBSCRIBE)
{
- Log::debug("BootstrapConnection: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str());
+ Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str());
std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex);
auto peersListIt = subscribedPeers.find(pubsubKey);
if(peersListIt == subscribedPeers.end())
@@ -152,7 +152,7 @@ namespace sibs
}
else
{
- Log::warn("BootstrapConnection: received message from peer that was not data or unsubscribe");
+ Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe");
}
}
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index 5c62e64..81df6d7 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -48,7 +48,6 @@ namespace sibs
BootstrapNode::~BootstrapNode()
{
- std::lock_guard<std::mutex> lock(connections.peersMutex);
connections.alive = false;
socket.reset();
acceptConnectionsThread.join();
@@ -94,7 +93,7 @@ namespace sibs
{
sibs::SafeDeserializer deserializer((const u8*)data, size);
PubsubKey pubsubKey;
- deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
if(messageType == MessageType::SUBSCRIBE)
{
@@ -108,7 +107,7 @@ namespace sibs
}
sibs::SafeSerializer serializer;
- serializer.add((u32)peer->address.address.sin_family);
+ serializer.add((u16)peer->address.address.sin_family);
serializer.add((u32)peer->address.address.sin_addr.s_addr);
serializer.add((u16)peer->address.address.sin_port);
auto newPeerMessage = std::make_shared<Message>(MessageType::SUBSCRIBE);
@@ -126,7 +125,7 @@ namespace sibs
sibs::SafeSerializer newPeerSerializer;
for(auto &existingPeer : peers)
{
- newPeerSerializer.add((u32)existingPeer->address.address.sin_family);
+ newPeerSerializer.add((u16)existingPeer->address.address.sin_family);
newPeerSerializer.add((u32)existingPeer->address.address.sin_addr.s_addr);
newPeerSerializer.add((u16)existingPeer->address.address.sin_port);
}
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index f8ff0f2..8034c50 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -52,16 +52,16 @@ namespace sibs
DirectConnections::~DirectConnections()
{
alive = false;
+ receiveDataThread.join();
peers.clear();
UDT::epoll_release(eid);
UDT::cleanup();
- receiveDataThread.join();
}
std::unique_ptr<Socket> DirectConnections::createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind)
{
Log::debug("UDT: Creating socket for ipv4 address %s, port: %d", addressToBind.getAddress().c_str(), addressToBind.getPort());
- UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_STREAM, 0);
+ UDTSOCKET udtSocket = UDT::socket(AF_INET, SOCK_DGRAM, 0);
if(udtSocket == UDT::INVALID_SOCK)
{
std::string errMsg = "UDT: Failed to create socket, error: ";
@@ -198,10 +198,12 @@ namespace sibs
if(data->getDataSize() > 819200) // 800kb
return false;
+ Log::debug("DirectConnections::send: sending %d bytes to %s:%d", data->getRawSize(), peer->address.getAddress().c_str(), peer->address.getPort());
// TODO: Replace this with light-weight threads (fibers)?
std::thread([peer, data, sendDataCallbackFunc]()
{
- int sentSize = UDT::send(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), 0);
+ const int one_min_ms = 1000 * 60;
+ int sentSize = UDT::sendmsg(peer->socket->udtSocket, (char*)data->data(), data->getRawSize(), one_min_ms, true);
if(sentSize == UDT::ERROR)
{
if(sendDataCallbackFunc)
@@ -285,6 +287,7 @@ namespace sibs
while(alive)
{
removeDisconnectedPeers();
+ readfds.clear();
int numfsReady = UDT::epoll_wait(eid, &readfds, nullptr, 1000);
if(numfsReady == 0)
{
@@ -318,7 +321,7 @@ namespace sibs
peersMutex.unlock();
try
{
- Log::debug("DirectConnection: Received data from peer: (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ Log::debug("DirectConnections::receiveData: Received %d bytes from peer: (ip: %s, port: %d)", receivedTotalSize, peer->address.getAddress().c_str(), peer->address.getPort());
if(peer->receiveDataCallbackFunc && receivedTotalSize > 0)
{
static_assert(sizeof(MessageType) == sizeof(u8), "");
@@ -330,6 +333,10 @@ namespace sibs
{
Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what());
}
+ catch(...)
+ {
+ Log::error("UDT: Receive callback function threw exception, ignoring...");
+ }
}
else if(receivedDataStatus == CUDTException::EINVSOCK)
{
@@ -350,42 +357,37 @@ namespace sibs
Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer);
}
}
- readfds.clear();
}
}
int DirectConnections::receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize)
{
*receivedTotalSize = 0;
- while(*receivedTotalSize < MAX_RECEIVED_DATA_SIZE)
+ int dataAvailableSize;
+ int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
+ if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
{
- int dataAvailableSize;
- int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
- if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
- {
- Log::error("UDT: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
- return UDT::getlasterror_code();
- }
-
- if(dataAvailableSize == 0)
- return 0;
-
- int receivedSize = UDT::recv(socket, &output[*receivedTotalSize], MAX_RECEIVED_DATA_SIZE - *receivedTotalSize, 0);
- if(receivedSize == UDT::ERROR)
- {
- Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
- return UDT::getlasterror_code();
- }
-
- if(receivedSize == 0)
- {
- return 0;
- }
-
- (*receivedTotalSize) += receivedSize;
+ Log::error("DirectConnections::receiveDataFromPeer: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
+ return UDT::getlasterror_code();
+ }
+
+ if(dataAvailableSize == 0)
+ return 0;
+
+ if(dataAvailableSize > MAX_RECEIVED_DATA_SIZE)
+ {
+ Log::error("DirectConnections::receiveDataFromPeer: Received too much data, ignoring...");
+ return 0;
+ }
+
+ int receivedSize = UDT::recvmsg(socket, &output[0], MAX_RECEIVED_DATA_SIZE);
+ if(receivedSize == UDT::ERROR)
+ {
+ Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
+ return UDT::getlasterror_code();
}
- Log::error("UDT: Received too much data, ignoring...");
+ (*receivedTotalSize) = receivedSize;
return 0;
}
@@ -394,7 +396,7 @@ namespace sibs
sibs::SafeSerializer serializer;
for(const auto &it : peers)
{
- serializer.add((u32)it->address.address.sin_family);
+ serializer.add((u16)it->address.address.sin_family);
serializer.add((u32)it->address.address.sin_addr.s_addr);
serializer.add((u16)it->address.address.sin_port);
}
@@ -408,7 +410,7 @@ namespace sibs
while(!deserializer.empty())
{
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
- peer->address.address.sin_family = deserializer.extract<u32>();
+ peer->address.address.sin_family = deserializer.extract<u16>();
peer->address.address.sin_addr.s_addr = deserializer.extract<u32>();
peer->address.address.sin_port = deserializer.extract<u16>();
result.push_back(peer);
diff --git a/tests/main.cpp b/tests/main.cpp
index d650128..2d1d505 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -15,29 +15,38 @@ int main()
sibs::BootstrapConnection connection1(sibs::Ipv4("127.0.0.1", PORT));
bool gotData1 = false;
- connection1.listen(key, [&gotData1](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
+ bool gotAsdf1 = false;
+ connection1.listen(key, [&gotData1, &gotAsdf1](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
{
if(size == 5 && strncmp((const char*)data, "hello", 5) == 0)
gotData1 = true;
+ if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0)
+ gotAsdf1 = true;
return true;
});
sibs::BootstrapConnection connection2(sibs::Ipv4("127.0.0.1", PORT));
bool gotData2 = false;
- connection2.listen(key, [&gotData2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
+ bool gotAsdf2 = false;
+ connection2.listen(key, [&gotData2, &gotAsdf2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
{
if(size == 5 && strncmp((const char*)data, "hello", 5) == 0)
gotData2 = true;
+ if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0)
+ gotAsdf2 = true;
return true;
});
// wait until connection1 and connection2 receive each other as peers from bootstrap node
std::this_thread::sleep_for(std::chrono::seconds(3));
connection1.put(key, "hello", 5);
+ connection1.put(key, "asdf", 4);
std::this_thread::sleep_for(std::chrono::seconds(3));
REQUIRE(gotData1);
+ REQUIRE(gotAsdf1);
REQUIRE(gotData2);
+ REQUIRE(gotAsdf2);
return 0;
}