#include "../include/sibs/BootstrapConnection.hpp" #include "../include/Log.hpp" #include namespace sibs { BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) { connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { std::lock_guard lock(subscribedPeersMutex); for(auto &topicUsers : subscribedPeers) { for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); ) { if(peer->address == (*it)->address) it = topicUsers.second.erase(it); else ++it; } } }); PubSubResult connectResult = PubSubResult::OK; std::string connectResultStr; bool connected = false; connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { serverPeer = peer; connectResult = result; connectResultStr = resultStr; connected = true; }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); while(!connected) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if(connectResult != PubSubResult::OK) { std::string errMsg = "Failed to connect to bootstrap node, error: "; errMsg += connectResultStr; throw BootstrapConnectionException(errMsg); } } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { if(messageType != MessageType::SUBSCRIBE) { Log::warn("BootstrapConnection: received message from server that was not subscribe"); return; } Log::debug("BootstrapConnection: Received subscriber(s) from bootstrap node"); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); // we want lock to live this whole scope so we dont connect to peer when cancelListen is called std::lock_guard lock(listenerCallbackFuncMutex); auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenerFuncIt == listenCallbackFuncs.end()) { Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); if(addressFamily == AF_INET) { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); Ipv4 newPeerAddress; newPeerAddress.address.sin_family = addressFamily; newPeerAddress.address.sin_addr.s_addr = ipv4Address; newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); Log::debug("BootstrapConnection: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { if(result == PubSubResult::OK) { std::lock_guard lock(subscribedPeersMutex); subscribedPeers[pubsubKey].push_back(peer); ++peer->sharedKeys; Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } else Log::error("BootstrapConnection: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } else { Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); return; } } } void BootstrapConnection::receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { if(size < PUBSUB_KEY_LENGTH) return; PubsubKey pubsubKey; memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); if(messageType == MessageType::DATA) { listenerCallbackFuncMutex.lock(); auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenerFuncIt == listenCallbackFuncs.end()) { listenerCallbackFuncMutex.unlock(); Log::debug("BoostrapConnection: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } auto listenCallbackFunc = listenerFuncIt->second; listenerCallbackFuncMutex.unlock(); if(listenCallbackFunc) { bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH); if(!continueListening) cancelListen({ pubsubKey }); } } else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapConnection: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); std::lock_guard subscribersMutex(subscribedPeersMutex); auto peersListIt = subscribedPeers.find(pubsubKey); if(peersListIt == subscribedPeers.end()) return; for(auto it = peersListIt->second.begin(); it != peersListIt->second.end(); ++it) { auto existingPeer = *it; if(*existingPeer == *peer) { peersListIt->second.erase(it); --peer->sharedKeys; if(peer->sharedKeys <= 0) connections.removePeer(peer->socket->udtSocket); break; } } } else { Log::warn("BootstrapConnection: received message from peer that was not data or unsubscribe"); } } ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) { { std::lock_guard lock(listenerCallbackFuncMutex); if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end()) throw PubsubKeyAlreadyListeningException(""); listenCallbackFuncs[pubsubKey] = callbackFunc; } auto message = std::make_shared(MessageType::SUBSCRIBE); message->append(pubsubKey.data.data(), pubsubKey.data.size()); connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); }); return { pubsubKey }; } bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size) { if(size > 819200) // 800kb return false; { std::lock_guard lock(listenerCallbackFuncMutex); auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second) listenCallbackFuncIt->second(nullptr, data, size); if(listenCallbackFuncIt == listenCallbackFuncs.end()) Log::warn("BootstrapConnection::put on key '%s' which we are not listening to", pubsubKey.toString().c_str()); } std::lock_guard lock(subscribedPeersMutex); auto peersIt = subscribedPeers.find(pubsubKey); if(peersIt == subscribedPeers.end()) { Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); return true; } auto message = std::make_shared(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); message->append(data, size); for(auto &peer : peersIt->second) { connections.send(peer, message); } return true; } bool BootstrapConnection::cancelListen(const ListenHandle &listener) { { std::lock_guard lock(listenerCallbackFuncMutex); auto it = listenCallbackFuncs.find(listener.key); if(it == listenCallbackFuncs.end()) return false; listenCallbackFuncs.erase(it); auto message = std::make_shared(MessageType::UNSUBSCRIBE); message->append(listener.key.data.data(), listener.key.data.size()); connections.send(serverPeer, message); std::lock_guard subscribersMutex(subscribedPeersMutex); auto peersListIt = subscribedPeers.find(listener.key); // this will happen if there are no other peers subscribed to the key if(peersListIt == subscribedPeers.end()) return true; for(auto &peer : peersListIt->second) { --peer->sharedKeys; if(peer->sharedKeys <= 0) { // disconnect from peer connections.removePeer(peer->socket->udtSocket); } else { // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore connections.send(peer, message); } } subscribedPeers.erase(peersListIt); } return true; } std::vector> BootstrapConnection::getPeers() { return connections.getPeers(); } }