diff options
-rw-r--r-- | include/sibs/DirectConnection.hpp | 3 | ||||
-rw-r--r-- | src/BootstrapNode.cpp | 2 | ||||
-rw-r--r-- | src/DirectConnection.cpp | 52 |
3 files changed, 46 insertions, 11 deletions
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index 0e8b961..45ba5d9 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -57,7 +57,8 @@ namespace sibs private: std::shared_ptr<DirectConnectionPeer> connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubReceiveDataCallback receiveDataCallbackFunc); void receiveData(); - bool receiveDataFromPeer(const int socket, char *output); + int receiveDataFromPeer(const int socket, char *output); + bool removePeer(int peerSocket); private: u16 port; int eid; diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index fb10dcc..71ff0c9 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -53,7 +53,7 @@ namespace sibs char clientHost[NI_MAXHOST]; char clientService[NI_MAXSERV]; getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV); - Log::debug("UDT: New connection: %s:%s", clientHost, clientService); + Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket); UDT::epoll_add_usock(connections.eid, clientSocket); std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>(); diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 6d9fe27..6001ebc 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -123,6 +123,21 @@ namespace sibs }).detach(); } + bool DirectConnections::removePeer(int peerSocket) + { + bool wasRemoved = false; + peersMutex.lock(); + auto peerIt = peers.find(peerSocket); + if(peerIt != peers.end()) + { + UDT::epoll_remove_usock(eid, peerSocket); + peers.erase(peerIt); + wasRemoved = true; + } + peersMutex.unlock(); + return wasRemoved; + } + void DirectConnections::receiveData() { std::vector<char> data; @@ -147,8 +162,9 @@ namespace sibs for(UDTSOCKET receivedDataFromPeer : readfds) { - bool receivedData = receiveDataFromPeer(receivedDataFromPeer, data.data()); - if(receivedData) + bool peerDisconnected = false; + int receivedDataStatus = receiveDataFromPeer(receivedDataFromPeer, data.data()); + if(receivedDataStatus == 0) { peersMutex.lock(); auto peer = peers[receivedDataFromPeer]; @@ -163,12 +179,30 @@ namespace sibs Log::error("UDT: Receive callback function threw exception: %s, ignoring...", e.what()); } } + else if(receivedDataStatus == CUDTException::EINVSOCK) + { + Log::debug("UDT: Invalid socket %d, did remote peer disconnect?", receivedDataFromPeer); + peerDisconnected = true; + } + else if(receivedDataStatus == CUDTException::ECONNLOST) + { + Log::debug("UDT: Connection was broken to socket %d (peer most likely disconnected), removing peer", receivedDataFromPeer); + peerDisconnected = true; + } + + if(peerDisconnected) + { + if(removePeer(receivedDataFromPeer)) + Log::debug("UDT: Removed peer socket %d", receivedDataFromPeer); + else + Log::error("UDT: Failed to remove peer socket %d, system said we got data from it but we are not connected to it", receivedDataFromPeer); + } } readfds.clear(); } } - bool DirectConnections::receiveDataFromPeer(const int socket, char *output) + int DirectConnections::receiveDataFromPeer(const int socket, char *output) { usize receivedTotalSize = 0; while(receivedTotalSize < MAX_RECEIVED_DATA_SIZE) @@ -177,26 +211,26 @@ namespace sibs int receiveSizeDataTypeSize = sizeof(dataAvailableSize); if(UDT::getsockopt(socket, 0, UDT_RCVDATA, &dataAvailableSize, &receiveSizeDataTypeSize) == UDT::ERROR) { - Log::error("UDT: Failed to receive data available size, error: %s", UDT::getlasterror_desc()); - return false; + Log::error("UDT: Failed to receive data available size, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); + return UDT::getlasterror_code(); } int receivedSize = UDT::recv(socket, &output[receivedTotalSize], MAX_RECEIVED_DATA_SIZE - receivedTotalSize, 0); if(receivedSize == UDT::ERROR) { - Log::error("UDT: Failed to receive data, error: %s", UDT::getlasterror_desc()); - return false; + Log::error("UDT: Failed to receive data, error: %s (%d)", UDT::getlasterror_desc(), UDT::getlasterror_code()); + return UDT::getlasterror_code(); } if(receivedSize == 0) { - return true; + return 0; } receivedTotalSize += dataAvailableSize; } Log::error("UDT: Received too much data, ignoring..."); - return false; + return 0; } } |