From 824e669ddb12f1c30074c84eb731fa598f92ccfa Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 8 Jun 2018 03:06:07 +0200 Subject: Add put method --- include/sibs/BootstrapConnection.hpp | 4 ++++ src/BootstrapConnection.cpp | 46 +++++++++++++++++++++++++++++------- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp index 964d777..094ee32 100644 --- a/include/sibs/BootstrapConnection.hpp +++ b/include/sibs/BootstrapConnection.hpp @@ -3,6 +3,7 @@ #include "../utils.hpp" #include "DirectConnection.hpp" #include "PubsubKey.hpp" +#include namespace sibs { @@ -22,6 +23,7 @@ namespace sibs // Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); + void put(const PubsubKey &pubsubKey, std::shared_ptr> data); private: void receiveDataFromServer(std::shared_ptr peer, const void *data, const usize size); void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr peer, const void *data, const usize size); @@ -29,5 +31,7 @@ namespace sibs DirectConnections connections; std::shared_ptr serverPeer; PubsubKeyMap listenCallbackFuncs; + PubsubKeyMap>> subscribedPeers; + std::mutex listenerCallbackFuncMutex; }; } diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 7c75cde..395e9e0 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -16,13 +16,18 @@ namespace sibs PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); + listenerCallbackFuncMutex.lock(); auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenerFuncIt == listenCallbackFuncs.end()) { Log::debug("BoostrapConnection: No listener found for key XXX, ignoring..."); + listenerCallbackFuncMutex.unlock(); return; } + auto listenerCallbackFunc = listenerFuncIt->second; + listenerCallbackFuncMutex.unlock(); + auto &peers = subscribedPeers[pubsubKey]; while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); @@ -30,13 +35,14 @@ namespace sibs { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); - Ipv4 newPeer; - newPeer.address.sin_family = addressFamily; - newPeer.address.sin_addr.s_addr = ipv4Address; - newPeer.address.sin_port = port; - memset(newPeer.address.sin_zero, 0, sizeof(newPeer.address.sin_zero)); + Ipv4 newPeerAddress; + newPeerAddress.address.sin_family = addressFamily; + newPeerAddress.address.sin_addr.s_addr = ipv4Address; + newPeerAddress.address.sin_port = port; + memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); // TODO: Move connection to thread and add callback function, just like @receiveData and @send - connections.connect(newPeer, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerFuncIt->second, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + std::shared_ptr newPeer = connections.connect(newPeerAddress, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + peers.push_back(newPeer); } else Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); @@ -51,9 +57,31 @@ namespace sibs void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) { - if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end()) - throw PubsubKeyAlreadyListeningException(""); - listenCallbackFuncs[pubsubKey] = callbackFunc; + { + std::lock_guard lock(listenerCallbackFuncMutex); + if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end()) + throw PubsubKeyAlreadyListeningException(""); + listenCallbackFuncs[pubsubKey] = callbackFunc; + } connections.send(serverPeer, std::make_shared>(pubsubKey.data.data(), pubsubKey.data.data())); } + + void BootstrapConnection::put(const PubsubKey &pubsubKey, std::shared_ptr> data) + { + { + std::lock_guard lock(listenerCallbackFuncMutex); + auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey); + if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second) + listenCallbackFuncIt->second(data->data(), data->size()); + } + + auto peersIt = subscribedPeers.find(pubsubKey); + if(peersIt == subscribedPeers.end()) + return; + + for(auto &peer : peersIt->second) + { + connections.send(peer, data); + } + } } -- cgit v1.2.3