aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/sibs/BootstrapConnection.hpp4
-rw-r--r--src/BootstrapConnection.cpp46
2 files changed, 41 insertions, 9 deletions
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp
index 964d777..094ee32 100644
--- a/include/sibs/BootstrapConnection.hpp
+++ b/include/sibs/BootstrapConnection.hpp
@@ -3,6 +3,7 @@
#include "../utils.hpp"
#include "DirectConnection.hpp"
#include "PubsubKey.hpp"
+#include <mutex>
namespace sibs
{
@@ -22,6 +23,7 @@ namespace sibs
// Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey
void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc);
+ void put(const PubsubKey &pubsubKey, std::shared_ptr<std::vector<u8>> data);
private:
void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
@@ -29,5 +31,7 @@ namespace sibs
DirectConnections connections;
std::shared_ptr<DirectConnectionPeer> serverPeer;
PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs;
+ PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers;
+ std::mutex listenerCallbackFuncMutex;
};
}
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 7c75cde..395e9e0 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -16,13 +16,18 @@ namespace sibs
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+ listenerCallbackFuncMutex.lock();
auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
if(listenerFuncIt == listenCallbackFuncs.end())
{
Log::debug("BoostrapConnection: No listener found for key XXX, ignoring...");
+ listenerCallbackFuncMutex.unlock();
return;
}
+ auto listenerCallbackFunc = listenerFuncIt->second;
+ listenerCallbackFuncMutex.unlock();
+ auto &peers = subscribedPeers[pubsubKey];
while(!deserializer.empty())
{
sa_family_t addressFamily = deserializer.extract<u32>();
@@ -30,13 +35,14 @@ namespace sibs
{
in_addr_t ipv4Address = deserializer.extract<u32>();
u16 port = deserializer.extract<u32>();
- Ipv4 newPeer;
- newPeer.address.sin_family = addressFamily;
- newPeer.address.sin_addr.s_addr = ipv4Address;
- newPeer.address.sin_port = port;
- memset(newPeer.address.sin_zero, 0, sizeof(newPeer.address.sin_zero));
+ Ipv4 newPeerAddress;
+ newPeerAddress.address.sin_family = addressFamily;
+ newPeerAddress.address.sin_addr.s_addr = ipv4Address;
+ newPeerAddress.address.sin_port = port;
+ memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
// TODO: Move connection to thread and add callback function, just like @receiveData and @send
- connections.connect(newPeer, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerFuncIt->second, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ std::shared_ptr<DirectConnectionPeer> newPeer = connections.connect(newPeerAddress, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ peers.push_back(newPeer);
}
else
Log::error("BootstrapConnection: Unknown address family: %d", addressFamily);
@@ -51,9 +57,31 @@ namespace sibs
void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
{
- if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
- throw PubsubKeyAlreadyListeningException("");
- listenCallbackFuncs[pubsubKey] = callbackFunc;
+ {
+ std::lock_guard<std::mutex> lock(listenerCallbackFuncMutex);
+ if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
+ throw PubsubKeyAlreadyListeningException("");
+ listenCallbackFuncs[pubsubKey] = callbackFunc;
+ }
connections.send(serverPeer, std::make_shared<std::vector<u8>>(pubsubKey.data.data(), pubsubKey.data.data()));
}
+
+ void BootstrapConnection::put(const PubsubKey &pubsubKey, std::shared_ptr<std::vector<u8>> data)
+ {
+ {
+ std::lock_guard<std::mutex> lock(listenerCallbackFuncMutex);
+ auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey);
+ if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second)
+ listenCallbackFuncIt->second(data->data(), data->size());
+ }
+
+ auto peersIt = subscribedPeers.find(pubsubKey);
+ if(peersIt == subscribedPeers.end())
+ return;
+
+ for(auto &peer : peersIt->second)
+ {
+ connections.send(peer, data);
+ }
+ }
}