aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/sibs/DirectConnection.hpp3
-rw-r--r--src/BootstrapNode.cpp2
-rw-r--r--src/DirectConnection.cpp52
3 files changed, 46 insertions, 11 deletions
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index 0e8b961..45ba5d9 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -57,7 +57,8 @@ namespace sibs
private:
std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc);
void receiveData();
- bool receiveDataFromPeer(const int socket, char *output);
+ int receiveDataFromPeer(const int socket, char *output);
+ bool removePeer(int peerSocket);
private:
u16 port;
int eid;
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index fb10dcc..71ff0c9 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -53,7 +53,7 @@ namespace sibs
char clientHost[NI_MAXHOST];
char clientService[NI_MAXSERV];
getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV);
- Log::debug("UDT: New connection: %s:%s", clientHost, clientService);
+ Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket);
UDT::epoll_add_usock(connections.eid, clientSocket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 6d9fe27..6001ebc 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -123,6 +123,21 @@ namespace sibs
}).detach();
}
+ bool DirectConnections::removePeer(int peerSocket)
+ {
+ bool wasRemoved = false;
+ peersMutex.lock();
+ auto peerIt = peers.find(peerSocket);
+ if(peerIt != peers.end())
+ {
+ UDT::epoll_remove_usock(eid, peerSocket);
+ peers.erase(peerIt);
+ wasRemoved = true;
+ }
+ peersMutex.unlock();
+ return wasRemoved;
+ }
+
void DirectConnections::receiveData()
{
std::vector<char> data;
@@ -147,8 +162,9 @@ namespace sibs
for(UDTSOCKET receivedDataFromPeer : readfds)
{
- bool receivedData = receiveDataFromPeer(receivedDataFromPeer, data.data());
- if(receivedData)
+ bool peerDisconnected = false;
+ int receivedDataStatus = receiveDataFromPeer(receivedDataFromPeer, data.data());
+ if(receivedDataStatus == 0)
{
peersMutex.lock();
auto peer = peers[receivedDataFromPeer];
@@ -163,12 +179,30 @@ namespace sibs
Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what());
}
}
+ else if(receivedDataStatus == CUDTException::EINVSOCK)
+ {
+ Log::debug("UDT: Invalid socket %d, did remote peer disconnect?", receivedDataFromPeer);
+ peerDisconnected = true;
+ }
+ else if(receivedDataStatus == CUDTException::ECONNLOST)
+ {
+ Log::debug("UDT: Connection was broken to socket %d (peer most likely disconnected), removing peer", receivedDataFromPeer);
+ peerDisconnected = true;
+ }
+
+ if(peerDisconnected)
+ {
+ if(removePeer(receivedDataFromPeer))
+ Log::debug("UDT: Removed peer socket %d", receivedDataFromPeer);
+ else
+ Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer);
+ }
}
readfds.clear();
}
}
- bool DirectConnections::receiveDataFromPeer(const int socket, char *output)
+ int DirectConnections::receiveDataFromPeer(const int socket, char *output)
{
usize receivedTotalSize = 0;
while(receivedTotalSize < MAX_RECEIVED_DATA_SIZE)
@@ -177,26 +211,26 @@ namespace sibs
int receiveSizeDataTypeSize = sizeof(dataAvailableSize);
if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR)
{
- Log::error("UDT: Failed to receive data available size, error: %s", UDT::getlasterror_desc());
- return false;
+ Log::error("UDT: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
+ return UDT::getlasterror_code();
}
int receivedSize = UDT::recv(socket, &output[receivedTotalSize], MAX_RECEIVED_DATA_SIZE - receivedTotalSize, 0);
if(receivedSize == UDT::ERROR)
{
- Log::error("UDT: Failed to receive data, error: %s", UDT::getlasterror_desc());
- return false;
+ Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code());
+ return UDT::getlasterror_code();
}
if(receivedSize == 0)
{
- return true;
+ return 0;
}
receivedTotalSize += dataAvailableSize;
}
Log::error("UDT: Received too much data, ignoring...");
- return false;
+ return 0;
}
}