#include "../include/sibs/BootstrapConnection.hpp" #include "../include/Log.hpp" #include #include #include namespace chrono = std::chrono; namespace sibs { BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) : alive(true), putThreadCount(0) { connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { std::lock_guard lock(subscribeDataMutex); for(auto &it : subscribeData) { for(auto subscribedPeersIt = it.second.peers.begin(), end = it.second.peers.end(); subscribedPeersIt != end; ++subscribedPeersIt) { if(peer->address == (*subscribedPeersIt)->address) { it.second.peers.erase(subscribedPeersIt); break; } } } }); PubSubResult connectResult = PubSubResult::OK; std::string connectResultStr; bool connected = false; connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { serverPeer = peer; connectResult = result; connectResultStr = resultStr; connected = true; }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); while(!connected) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if(connectResult != PubSubResult::OK) { std::string errMsg = "Failed to connect to bootstrap node, error: "; errMsg += connectResultStr; throw BootstrapConnectionException(errMsg); } } BootstrapConnection::~BootstrapConnection() { alive = false; while(putThreadCount > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { if(messageType != MessageType::SUBSCRIBE) { Log::warn("BootstrapConnection::receiveDataFromServer: received message from server that was not subscribe"); return; } Log::debug("BootstrapConnection::receiveDataFromServer: Received subscriber(s) from bootstrap node"); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); // we want lock to live this whole scope so we dont connect to peer when cancelListen is called { std::lock_guard lock(subscribeDataMutex); auto subscribeDataIt = subscribeData.find(pubsubKey); if(subscribeDataIt == subscribeData.end()) { Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } } while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); if(addressFamily == AF_INET) { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); Ipv4 newPeerAddress; newPeerAddress.address.sin_family = addressFamily; newPeerAddress.address.sin_addr.s_addr = ipv4Address; newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { if(result == PubSubResult::OK) { std::lock_guard lock(subscribeDataMutex); auto subscribeDataIt = subscribeData.find(pubsubKey); if(subscribeDataIt == subscribeData.end()) { Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } subscribeDataIt->second.peers.push_back(peer); ++peer->sharedKeys; Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } else Log::error("BootstrapConnection::receiveDataFromServer: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } else { Log::error("BootstrapConnection::receiveDataFromServer: Unknown address family: %d", addressFamily); return; } } } void BootstrapConnection::receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { if(size < PUBSUB_KEY_LENGTH) return; PubsubKey pubsubKey; memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); if(messageType == MessageType::DATA) { subscribeDataMutex.lock(); auto listenerFuncIt = subscribeData.find(pubsubKey); if(listenerFuncIt == subscribeData.end()) { subscribeDataMutex.unlock(); Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc; subscribeDataMutex.unlock(); if(listenCallbackFunc) { bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH); if(!continueListening) cancelListen({ pubsubKey }); } } else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); std::lock_guard subscribersMutex(subscribeDataMutex); auto peersListIt = subscribeData.find(pubsubKey); if(peersListIt == subscribeData.end()) return; for(auto it = peersListIt->second.peers.begin(); it != peersListIt->second.peers.end(); ++it) { auto existingPeer = *it; if(*existingPeer == *peer) { peersListIt->second.peers.erase(it); --peer->sharedKeys; if(peer->sharedKeys <= 0) connections.removePeer(peer->socket->udtSocket); break; } } } else { Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe"); } } ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc) { { std::lock_guard lock(subscribeDataMutex); auto it = subscribeData.find(pubsubKey); if(it != subscribeData.end()) { Log::warn("BootstrapConnection::listen called on existing listener, overwriting callback function"); if(registerCallbackFunc) it->second.listenCallbackFunc = callbackFunc; return { pubsubKey }; } auto &data = subscribeData[pubsubKey]; if(registerCallbackFunc) data.listenCallbackFunc = callbackFunc; data.listenStartTimeMs = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); } auto message = std::make_shared(MessageType::SUBSCRIBE); message->append(pubsubKey.data.data(), pubsubKey.data.size()); Log::debug("BootstrapConnection::listen: starting to listen to %s", pubsubKey.toString().c_str()); connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); }); return { pubsubKey }; } ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) { return listen(pubsubKey, callbackFunc, true); } bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size) { if(size > 819200) // 800kb return false; { std::lock_guard lock(subscribeDataMutex); auto listenCallbackFuncIt = subscribeData.find(pubsubKey); if(listenCallbackFuncIt != subscribeData.end() && listenCallbackFuncIt->second.listenCallbackFunc) listenCallbackFuncIt->second.listenCallbackFunc(nullptr, data, size); if(listenCallbackFuncIt == subscribeData.end()) { Log::warn("BootstrapConnection::put on key '%s' which we are not listening to, automatically listening now", pubsubKey.toString().c_str()); listen(pubsubKey, nullptr, false); } } auto message = std::make_shared(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); message->append(data, size); std::thread([this, pubsubKey, message]() { ++putThreadCount; std::unordered_set peersMessaged; std::vector> peersToMessage; i64 listenStartTimeMs = 0; while(alive) { { std::lock_guard lock(subscribeDataMutex); auto subscribeIt = subscribeData.find(pubsubKey); if(subscribeIt == subscribeData.end()) { Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str()); std::this_thread::sleep_for(std::chrono::milliseconds(200)); goto checkPutAttemptDuration; } listenStartTimeMs = subscribeIt->second.listenStartTimeMs; for(auto &peer : subscribeIt->second.peers) { if(peersMessaged.find(peer->socket->udtSocket) == peersMessaged.end()) { peersToMessage.push_back(peer); peersMessaged.insert(peer->socket->udtSocket); } } } if(!peersToMessage.empty()) { for(auto &peer : peersToMessage) { connections.send(peer, message); } peersToMessage.clear(); } checkPutAttemptDuration: const i64 half_min_ms = 1000 * 30; i64 now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); if(now - listenStartTimeMs > half_min_ms) break; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } --putThreadCount; }).detach(); return true; } bool BootstrapConnection::cancelListen(const ListenHandle &listener) { std::vector> peersToMessage; { std::lock_guard lock(subscribeDataMutex); auto it = subscribeData.find(listener.key); if(it == subscribeData.end()) return false; peersToMessage = it->second.peers; } auto message = std::make_shared(MessageType::UNSUBSCRIBE); message->append(listener.key.data.data(), listener.key.data.size()); connections.send(serverPeer, message); for(auto &peer : peersToMessage) { --peer->sharedKeys; if(peer->sharedKeys <= 0) { // disconnect from peer connections.removePeer(peer->socket->udtSocket); } else { // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore connections.send(peer, message); } } { std::lock_guard lock(subscribeDataMutex); auto it = subscribeData.find(listener.key); if(it != subscribeData.end()) subscribeData.erase(it); } return true; } bool BootstrapConnection::areWeListeningOnKey(const PubsubKey &pubsubKey) { std::lock_guard lock(subscribeDataMutex); return subscribeData.find(pubsubKey) != subscribeData.end(); } std::vector> BootstrapConnection::getPeers() { return connections.getPeers(); } }