diff options
author | dec05eba <0xdec05eba@gmail.com> | 2018-10-14 06:05:09 +0200 |
---|---|---|
committer | dec05eba <0xdec05eba@gmail.com> | 2018-10-14 06:05:12 +0200 |
commit | efc7311893b0fb25129eb2b715992ba2ac43d65c (patch) | |
tree | d623dfe299f3ae1afec8238be09b5cd0542af7e1 | |
parent | 55405854c27417ec33797c9e2b438ce17ac70bf2 (diff) |
Removed user from subscriptions on disconnect
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | include/sibs/BootstrapNode.hpp | 2 | ||||
-rw-r--r-- | include/sibs/DirectConnection.hpp | 4 | ||||
-rw-r--r-- | src/BootstrapNode.cpp | 16 | ||||
-rw-r--r-- | src/DirectConnection.cpp | 20 |
5 files changed, 41 insertions, 3 deletions
@@ -1 +1,3 @@ sibs-build/ +.vscode/ +compile_commands.json diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp index 48e527c..ab3d6b3 100644 --- a/include/sibs/BootstrapNode.hpp +++ b/include/sibs/BootstrapNode.hpp @@ -4,6 +4,7 @@ #include "IpAddress.hpp" #include "PubsubKey.hpp" #include <vector> +#include <mutex> namespace sibs { @@ -28,5 +29,6 @@ namespace sibs int socket; std::thread acceptConnectionsThread; PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers; + std::mutex subscribedPeersMutex; }; } diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index 4b5c67f..8e3865f 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -31,6 +31,7 @@ namespace sibs using PubSubConnectCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)>; using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)>; using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>; + using PubSubOnRemoveDisconnectedPeerCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer)>; struct DirectConnectionPeer { @@ -53,6 +54,8 @@ namespace sibs void connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc); void send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr); + + void onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc); protected: int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true); private: @@ -68,5 +71,6 @@ namespace sibs std::thread receiveDataThread; std::mutex peersMutex; bool alive; + PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback; }; } diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index e62e48d..273abf0 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -17,6 +17,21 @@ namespace sibs connections(27130), socket(connections.createSocket(address, false, true)) { + connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer) + { + std::lock_guard<std::mutex> lock(subscribedPeersMutex); + for(auto &topicUsers : subscribedPeers) + { + for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); ) + { + if(peer->address == (*it)->address) + it = topicUsers.second.erase(it); + else + ++it; + } + } + }); + if(UDT::listen(socket, 10) == UDT::ERROR) { std::string errMsg = "UDT: Failed to listen, error: "; @@ -75,6 +90,7 @@ namespace sibs PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); + std::lock_guard<std::mutex> lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto &peer : peers) { diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index 71c7f1f..083f557 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -29,7 +29,8 @@ namespace sibs DirectConnections::DirectConnections(u16 _port) : port(_port == 0 ? (u16)generateRandomNumber(2000, 32000) : _port), - alive(true) + alive(true), + removeDisconnectedPeerCallback(nullptr) { UDT::startup(); eid = UDT::epoll_create(); @@ -185,6 +186,11 @@ namespace sibs sendDataCallbackFunc(PubSubResult::OK, ""); }).detach(); } + + void DirectConnections::onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc) + { + removeDisconnectedPeerCallback = callbackFunc; + } bool DirectConnections::removePeer(int peerSocket) { @@ -193,6 +199,8 @@ namespace sibs auto peerIt = peers.find(peerSocket); if(peerIt != peers.end()) { + if(removeDisconnectedPeerCallback) + removeDisconnectedPeerCallback(peerIt->second); UDT::epoll_remove_usock(eid, peerSocket); peers.erase(peerIt); wasRemoved = true; @@ -206,10 +214,16 @@ namespace sibs peersMutex.lock(); for(std::unordered_map<int, std::shared_ptr<DirectConnectionPeer>>::iterator it = peers.begin(); it != peers.end(); ) { - UDTSTATUS peerSocketStatus = UDT::getsockstate(it->first); + int socket = it->first; + UDTSTATUS peerSocketStatus = UDT::getsockstate(socket); if(peerSocketStatus == UDTSTATUS::BROKEN || peerSocketStatus == UDTSTATUS::CLOSING || peerSocketStatus == UDTSTATUS::CLOSED || peerSocketStatus == UDTSTATUS::NONEXIST) { - int socket = it->first; + if(removeDisconnectedPeerCallback) + removeDisconnectedPeerCallback(it->second); + + if(peerSocketStatus == UDTSTATUS::BROKEN) + UDT::epoll_remove_usock(eid, socket); + Log::debug("UDT: Connection was broken to socket %d (peer most likely disconnected), removing peer", socket); it = peers.erase(it); Log::debug("UDT: Removed peer socket %d", socket); |