aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/BootstrapConnection.cpp46
1 files changed, 37 insertions, 9 deletions
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 7c75cde..395e9e0 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -16,13 +16,18 @@ namespace sibs
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
+ listenerCallbackFuncMutex.lock();
auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
if(listenerFuncIt == listenCallbackFuncs.end())
{
Log::debug("BoostrapConnection: No listener found for key XXX, ignoring...");
+ listenerCallbackFuncMutex.unlock();
return;
}
+ auto listenerCallbackFunc = listenerFuncIt->second;
+ listenerCallbackFuncMutex.unlock();
+ auto &peers = subscribedPeers[pubsubKey];
while(!deserializer.empty())
{
sa_family_t addressFamily = deserializer.extract<u32>();
@@ -30,13 +35,14 @@ namespace sibs
{
in_addr_t ipv4Address = deserializer.extract<u32>();
u16 port = deserializer.extract<u32>();
- Ipv4 newPeer;
- newPeer.address.sin_family = addressFamily;
- newPeer.address.sin_addr.s_addr = ipv4Address;
- newPeer.address.sin_port = port;
- memset(newPeer.address.sin_zero, 0, sizeof(newPeer.address.sin_zero));
+ Ipv4 newPeerAddress;
+ newPeerAddress.address.sin_family = addressFamily;
+ newPeerAddress.address.sin_addr.s_addr = ipv4Address;
+ newPeerAddress.address.sin_port = port;
+ memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
// TODO: Move connection to thread and add callback function, just like @receiveData and @send
- connections.connect(newPeer, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerFuncIt->second, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ std::shared_ptr<DirectConnectionPeer> newPeer = connections.connect(newPeerAddress, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ peers.push_back(newPeer);
}
else
Log::error("BootstrapConnection: Unknown address family: %d", addressFamily);
@@ -51,9 +57,31 @@ namespace sibs
void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
{
- if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
- throw PubsubKeyAlreadyListeningException("");
- listenCallbackFuncs[pubsubKey] = callbackFunc;
+ {
+ std::lock_guard<std::mutex> lock(listenerCallbackFuncMutex);
+ if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
+ throw PubsubKeyAlreadyListeningException("");
+ listenCallbackFuncs[pubsubKey] = callbackFunc;
+ }
connections.send(serverPeer, std::make_shared<std::vector<u8>>(pubsubKey.data.data(), pubsubKey.data.data()));
}
+
+ void BootstrapConnection::put(const PubsubKey &pubsubKey, std::shared_ptr<std::vector<u8>> data)
+ {
+ {
+ std::lock_guard<std::mutex> lock(listenerCallbackFuncMutex);
+ auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey);
+ if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second)
+ listenCallbackFuncIt->second(data->data(), data->size());
+ }
+
+ auto peersIt = subscribedPeers.find(pubsubKey);
+ if(peersIt == subscribedPeers.end())
+ return;
+
+ for(auto &peer : peersIt->second)
+ {
+ connections.send(peer, data);
+ }
+ }
}