aboutsummaryrefslogtreecommitdiff
path: root/src/BootstrapConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/BootstrapConnection.cpp')
-rw-r--r--src/BootstrapConnection.cpp165
1 files changed, 138 insertions, 27 deletions
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index e5f5178..0237a90 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -8,7 +8,7 @@ namespace sibs
{
connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer)
{
- std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
for(auto &topicUsers : subscribedPeers)
{
for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); )
@@ -30,7 +30,7 @@ namespace sibs
connectResult = result;
connectResultStr = resultStr;
connected = true;
- }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
while(!connected)
{
@@ -46,23 +46,27 @@ namespace sibs
}
// TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key
- void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)
+ void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
+ if(messageType != MessageType::SUBSCRIBE)
+ {
+ Log::warn("BootstrapConnection: received message from server that was not subscribe");
+ return;
+ }
+
Log::debug("BootstrapConnection: Received subscriber(s) from bootstrap node");
sibs::SafeDeserializer deserializer((const u8*)data, size);
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
- listenerCallbackFuncMutex.lock();
+ // we want lock to live this whole scope so we dont connect to peer when cancelListen is called
+ std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
if(listenerFuncIt == listenCallbackFuncs.end())
{
- Log::debug("BoostrapConnection: No listener found for key XXX, ignoring...");
- listenerCallbackFuncMutex.unlock();
+ Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
return;
}
- auto listenerCallbackFunc = listenerFuncIt->second;
- listenerCallbackFuncMutex.unlock();
while(!deserializer.empty())
{
@@ -76,18 +80,19 @@ namespace sibs
newPeerAddress.address.sin_addr.s_addr = ipv4Address;
newPeerAddress.address.sin_port = port;
memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
- connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr<DirectConnectionPeer> newPeer, PubSubResult result, const std::string &resultStr)
+ Log::debug("BootstrapConnection: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort());
+ connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)
{
if(result == PubSubResult::OK)
{
- subscribedPeersMutex.lock();
- subscribedPeers[pubsubKey].push_back(newPeer);
- subscribedPeersMutex.unlock();
- Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", newPeer->address.getAddress().c_str(), newPeer->address.getPort());
+ std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
+ subscribedPeers[pubsubKey].push_back(peer);
+ ++peer->sharedKeys;
+ Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort());
}
else
Log::error("BootstrapConnection: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str());
- }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
+ }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
}
else
{
@@ -97,46 +102,152 @@ namespace sibs
}
}
- void BootstrapConnection::receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)
+ void BootstrapConnection::receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
- if(listenCallbackFunc)
- listenCallbackFunc(peer.get(), data, size);
+ if(size < PUBSUB_KEY_LENGTH)
+ return;
+
+ PubsubKey pubsubKey;
+ memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH);
+ if(messageType == MessageType::DATA)
+ {
+ listenerCallbackFuncMutex.lock();
+ auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
+ if(listenerFuncIt == listenCallbackFuncs.end())
+ {
+ listenerCallbackFuncMutex.unlock();
+ Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ return;
+ }
+ auto listenCallbackFunc = listenerFuncIt->second;
+ listenerCallbackFuncMutex.unlock();
+
+ if(listenCallbackFunc)
+ {
+ bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH);
+ if(!continueListening)
+ cancelListen({ pubsubKey });
+ }
+ }
+ else if(messageType == MessageType::UNSUBSCRIBE)
+ {
+ Log::debug("BootstrapConnection: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str());
+ std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex);
+ auto peersListIt = subscribedPeers.find(pubsubKey);
+ if(peersListIt == subscribedPeers.end())
+ return;
+
+ for(auto it = peersListIt->second.begin(); it != peersListIt->second.end(); ++it)
+ {
+ auto existingPeer = *it;
+ if(*existingPeer == *peer)
+ {
+ peersListIt->second.erase(it);
+ --peer->sharedKeys;
+ if(peer->sharedKeys <= 0)
+ connections.removePeer(peer->socket->udtSocket);
+ break;
+ }
+ }
+ }
+ else
+ {
+ Log::warn("BootstrapConnection: received message from peer that was not data or unsubscribe");
+ }
}
- void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
+ ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
{
{
- std::lock_guard<std::mutex> lock(listenerCallbackFuncMutex);
+ std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end())
throw PubsubKeyAlreadyListeningException("");
listenCallbackFuncs[pubsubKey] = callbackFunc;
}
- connections.send(serverPeer, std::make_shared<std::vector<u8>>(pubsubKey.data.begin(), pubsubKey.data.end()),
- [](PubSubResult result, const std::string &resultStr)
+
+ auto message = std::make_shared<Message>(MessageType::SUBSCRIBE);
+ message->append(pubsubKey.data.data(), pubsubKey.data.size());
+ connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr)
{
Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str());
});
+
+ return { pubsubKey };
}
- void BootstrapConnection::put(const PubsubKey &pubsubKey, std::shared_ptr<std::vector<u8>> data)
+ bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size)
{
+ if(size > 819200) // 800kb
+ return false;
+
{
- std::lock_guard<std::mutex> lock(listenerCallbackFuncMutex);
+ std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey);
if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second)
- listenCallbackFuncIt->second(nullptr, data->data(), data->size());
+ listenCallbackFuncIt->second(nullptr, data, size);
+
+ if(listenCallbackFuncIt == listenCallbackFuncs.end())
+ Log::warn("BootstrapConnection::put on key '%s' which we are not listening to", pubsubKey.toString().c_str());
}
- std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
auto peersIt = subscribedPeers.find(pubsubKey);
if(peersIt == subscribedPeers.end())
{
- return;
+ Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str());
+ return true;
}
+ auto message = std::make_shared<Message>(MessageType::DATA);
+ message->append(pubsubKey.data.data(), pubsubKey.data.size());
+ message->append(data, size);
for(auto &peer : peersIt->second)
{
- connections.send(peer, data);
+ connections.send(peer, message);
+ }
+ return true;
+ }
+
+ bool BootstrapConnection::cancelListen(const ListenHandle &listener)
+ {
+ {
+ std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
+ auto it = listenCallbackFuncs.find(listener.key);
+ if(it == listenCallbackFuncs.end())
+ return false;
+ listenCallbackFuncs.erase(it);
+
+ auto message = std::make_shared<Message>(MessageType::UNSUBSCRIBE);
+ message->append(listener.key.data.data(), listener.key.data.size());
+ connections.send(serverPeer, message);
+
+ std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex);
+ auto peersListIt = subscribedPeers.find(listener.key);
+ // this will happen if there are no other peers subscribed to the key
+ if(peersListIt == subscribedPeers.end())
+ return true;
+
+ for(auto &peer : peersListIt->second)
+ {
+ --peer->sharedKeys;
+ if(peer->sharedKeys <= 0)
+ {
+ // disconnect from peer
+ connections.removePeer(peer->socket->udtSocket);
+ }
+ else
+ {
+ // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore
+ connections.send(peer, message);
+ }
+ }
+ subscribedPeers.erase(peersListIt);
}
+ return true;
+ }
+
+ std::vector<std::shared_ptr<DirectConnectionPeer>> BootstrapConnection::getPeers()
+ {
+ return connections.getPeers();
}
}