aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/BootstrapNode.cpp16
-rw-r--r--src/DirectConnection.cpp20
2 files changed, 33 insertions, 3 deletions
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index e62e48d..273abf0 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -17,6 +17,21 @@ namespace sibs
connections(27130),
socket(connections.createSocket(address, false, true))
{
+ connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer)
+ {
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ for(auto &topicUsers : subscribedPeers)
+ {
+ for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); )
+ {
+ if(peer->address == (*it)->address)
+ it = topicUsers.second.erase(it);
+ else
+ ++it;
+ }
+ }
+ });
+
if(UDT::listen(socket, 10) == UDT::ERROR)
{
std::string errMsg = "UDT: Failed to listen, error: ";
@@ -75,6 +90,7 @@ namespace sibs
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
auto &peers = subscribedPeers[pubsubKey];
for(auto &peer : peers)
{
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 71c7f1f..083f557 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -29,7 +29,8 @@ namespace sibs
DirectConnections::DirectConnections(u16 _port) :
port(_port == 0 ? (u16)generateRandomNumber(2000, 32000) : _port),
- alive(true)
+ alive(true),
+ removeDisconnectedPeerCallback(nullptr)
{
UDT::startup();
eid = UDT::epoll_create();
@@ -185,6 +186,11 @@ namespace sibs
sendDataCallbackFunc(PubSubResult::OK, "");
}).detach();
}
+
+ void DirectConnections::onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc)
+ {
+ removeDisconnectedPeerCallback = callbackFunc;
+ }
bool DirectConnections::removePeer(int peerSocket)
{
@@ -193,6 +199,8 @@ namespace sibs
auto peerIt = peers.find(peerSocket);
if(peerIt != peers.end())
{
+ if(removeDisconnectedPeerCallback)
+ removeDisconnectedPeerCallback(peerIt->second);
UDT::epoll_remove_usock(eid, peerSocket);
peers.erase(peerIt);
wasRemoved = true;
@@ -206,10 +214,16 @@ namespace sibs
peersMutex.lock();
for(std::unordered_map<int, std::shared_ptr<DirectConnectionPeer>>::iterator it = peers.begin(); it != peers.end(); )
{
- UDTSTATUS peerSocketStatus = UDT::getsockstate(it->first);
+ int socket = it->first;
+ UDTSTATUS peerSocketStatus = UDT::getsockstate(socket);
if(peerSocketStatus == UDTSTATUS::BROKEN || peerSocketStatus == UDTSTATUS::CLOSING || peerSocketStatus == UDTSTATUS::CLOSED || peerSocketStatus == UDTSTATUS::NONEXIST)
{
- int socket = it->first;
+ if(removeDisconnectedPeerCallback)
+ removeDisconnectedPeerCallback(it->second);
+
+ if(peerSocketStatus == UDTSTATUS::BROKEN)
+ UDT::epoll_remove_usock(eid, socket);
+
Log::debug("UDT: Connection was broken to socket %d (peer most likely disconnected), removing peer", socket);
it = peers.erase(it);
Log::debug("UDT: Removed peer socket %d", socket);