aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-10-14 06:05:09 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-10-14 06:05:12 +0200
commitefc7311893b0fb25129eb2b715992ba2ac43d65c (patch)
treed623dfe299f3ae1afec8238be09b5cd0542af7e1
parent55405854c27417ec33797c9e2b438ce17ac70bf2 (diff)
Removed user from subscriptions on disconnect
-rw-r--r--.gitignore2
-rw-r--r--include/sibs/BootstrapNode.hpp2
-rw-r--r--include/sibs/DirectConnection.hpp4
-rw-r--r--src/BootstrapNode.cpp16
-rw-r--r--src/DirectConnection.cpp20
5 files changed, 41 insertions, 3 deletions
diff --git a/.gitignore b/.gitignore
index 97420ef..39ac517 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,3 @@
sibs-build/
+.vscode/
+compile_commands.json
diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp
index 48e527c..ab3d6b3 100644
--- a/include/sibs/BootstrapNode.hpp
+++ b/include/sibs/BootstrapNode.hpp
@@ -4,6 +4,7 @@
#include "IpAddress.hpp"
#include "PubsubKey.hpp"
#include <vector>
+#include <mutex>
namespace sibs
{
@@ -28,5 +29,6 @@ namespace sibs
int socket;
std::thread acceptConnectionsThread;
PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers;
+ std::mutex subscribedPeersMutex;
};
}
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index 4b5c67f..8e3865f 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -31,6 +31,7 @@ namespace sibs
using PubSubConnectCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)>;
using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)>;
using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>;
+ using PubSubOnRemoveDisconnectedPeerCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer)>;
struct DirectConnectionPeer
{
@@ -53,6 +54,8 @@ namespace sibs
void connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc);
void send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr);
+
+ void onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc);
protected:
int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true);
private:
@@ -68,5 +71,6 @@ namespace sibs
std::thread receiveDataThread;
std::mutex peersMutex;
bool alive;
+ PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback;
};
}
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index e62e48d..273abf0 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -17,6 +17,21 @@ namespace sibs
connections(27130),
socket(connections.createSocket(address, false, true))
{
+ connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer)
+ {
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ for(auto &topicUsers : subscribedPeers)
+ {
+ for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); )
+ {
+ if(peer->address == (*it)->address)
+ it = topicUsers.second.erase(it);
+ else
+ ++it;
+ }
+ }
+ });
+
if(UDT::listen(socket, 10) == UDT::ERROR)
{
std::string errMsg = "UDT: Failed to listen, error: ";
@@ -75,6 +90,7 @@ namespace sibs
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
auto &peers = subscribedPeers[pubsubKey];
for(auto &peer : peers)
{
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 71c7f1f..083f557 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -29,7 +29,8 @@ namespace sibs
DirectConnections::DirectConnections(u16 _port) :
port(_port == 0 ? (u16)generateRandomNumber(2000, 32000) : _port),
- alive(true)
+ alive(true),
+ removeDisconnectedPeerCallback(nullptr)
{
UDT::startup();
eid = UDT::epoll_create();
@@ -185,6 +186,11 @@ namespace sibs
sendDataCallbackFunc(PubSubResult::OK, "");
}).detach();
}
+
+ void DirectConnections::onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc)
+ {
+ removeDisconnectedPeerCallback = callbackFunc;
+ }
bool DirectConnections::removePeer(int peerSocket)
{
@@ -193,6 +199,8 @@ namespace sibs
auto peerIt = peers.find(peerSocket);
if(peerIt != peers.end())
{
+ if(removeDisconnectedPeerCallback)
+ removeDisconnectedPeerCallback(peerIt->second);
UDT::epoll_remove_usock(eid, peerSocket);
peers.erase(peerIt);
wasRemoved = true;
@@ -206,10 +214,16 @@ namespace sibs
peersMutex.lock();
for(std::unordered_map<int, std::shared_ptr<DirectConnectionPeer>>::iterator it = peers.begin(); it != peers.end(); )
{
- UDTSTATUS peerSocketStatus = UDT::getsockstate(it->first);
+ int socket = it->first;
+ UDTSTATUS peerSocketStatus = UDT::getsockstate(socket);
if(peerSocketStatus == UDTSTATUS::BROKEN || peerSocketStatus == UDTSTATUS::CLOSING || peerSocketStatus == UDTSTATUS::CLOSED || peerSocketStatus == UDTSTATUS::NONEXIST)
{
- int socket = it->first;
+ if(removeDisconnectedPeerCallback)
+ removeDisconnectedPeerCallback(it->second);
+
+ if(peerSocketStatus == UDTSTATUS::BROKEN)
+ UDT::epoll_remove_usock(eid, socket);
+
Log::debug("UDT: Connection was broken to socket %d (peer most likely disconnected), removing peer", socket);
it = peers.erase(it);
Log::debug("UDT: Removed peer socket %d", socket);