#include "../include/sibs/BootstrapConnection.hpp" #include "../include/Log.hpp" #include namespace sibs { BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) { PubSubResult connectResult = PubSubResult::OK; std::string connectResultStr; bool connected = false; connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { serverPeer = peer; connectResult = result; connectResultStr = resultStr; connected = true; }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); while(!connected) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if(connectResult != PubSubResult::OK) { std::string errMsg = "Failed to connect to bootstrap node, error: "; errMsg += connectResultStr; throw BootstrapConnectionException(errMsg); } } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, const void *data, const usize size) { Log::debug("BootstrapConnection: Received subscriber(s) from bootstrap node"); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); listenerCallbackFuncMutex.lock(); auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenerFuncIt == listenCallbackFuncs.end()) { Log::debug("BoostrapConnection: No listener found for key XXX, ignoring..."); listenerCallbackFuncMutex.unlock(); return; } auto listenerCallbackFunc = listenerFuncIt->second; listenerCallbackFuncMutex.unlock(); while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); if(addressFamily == AF_INET) { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); Ipv4 newPeerAddress; newPeerAddress.address.sin_family = addressFamily; newPeerAddress.address.sin_addr.s_addr = ipv4Address; newPeerAddress.address.sin_port = port; memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr newPeer, PubSubResult result, const std::string &resultStr) { if(result == PubSubResult::OK) { subscribedPeersMutex.lock(); subscribedPeers[pubsubKey].push_back(newPeer); subscribedPeersMutex.unlock(); Log::debug("BootstrapConnection: Connected to peer (ip: %s, port: %d) given by bootstrap node", newPeer->address.getAddress().c_str(), newPeer->address.getPort()); } else Log::error("BootstrapConnection: Failed to connect to peer given by bootstrap node, error: %s", resultStr.c_str()); }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, listenerCallbackFunc, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } else { Log::error("BootstrapConnection: Unknown address family: %d", addressFamily); return; } } } void BootstrapConnection::receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr peer, const void *data, const usize size) { if(listenCallbackFunc) listenCallbackFunc(data, size); } void BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) { { std::lock_guard lock(listenerCallbackFuncMutex); if(listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end()) throw PubsubKeyAlreadyListeningException(""); listenCallbackFuncs[pubsubKey] = callbackFunc; } connections.send(serverPeer, std::make_shared>(pubsubKey.data.begin(), pubsubKey.data.end()), [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); }); } void BootstrapConnection::put(const PubsubKey &pubsubKey, std::shared_ptr> data) { { std::lock_guard lock(listenerCallbackFuncMutex); auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey); if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second) listenCallbackFuncIt->second(data->data(), data->size()); } std::lock_guard lock(subscribedPeersMutex); auto peersIt = subscribedPeers.find(pubsubKey); if(peersIt == subscribedPeers.end()) { return; } for(auto &peer : peersIt->second) { connections.send(peer, data); } } }