#include "../include/sibs/BootstrapConnection.hpp" #include "../include/Log.hpp" #include #include #include namespace chrono = std::chrono; namespace sibs { BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress, const ConnectionOptions &options) : connections(0, options), alive(true), putThreadCount(0) { Log::debug("Created bootstrap connection, p2p? %s", options.useP2p ? "yes" : "no"); connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { std::lock_guard lock(subscribeDataMutex); for(auto &it : subscribeData) { for(auto subscribedPeersIt = it.second.peers.begin(), end = it.second.peers.end(); subscribedPeersIt != end; ++subscribedPeersIt) { if(peer->address == (*subscribedPeersIt)->address) { it.second.peers.erase(subscribedPeersIt); break; } } } }); PubSubResult connectResult = PubSubResult::RESULT_OK; std::string connectResultStr; bool connected = false; connections.connectServer(bootstrapAddress, [this, &connectResult, &connectResultStr, &connected](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { serverPeer = peer; connectResult = result; connectResultStr = resultStr; connected = true; }, std::bind(&BootstrapConnection::receiveDataFromServer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); while(!connected) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } if(connectResult != PubSubResult::RESULT_OK) { std::string errMsg = "Failed to connect to bootstrap node, error: "; errMsg += connectResultStr; throw BootstrapConnectionException(errMsg); } } std::future> BootstrapConnection::connect(const Ipv4 &bootstrapAddress, const ConnectionOptions &options) { std::promise> connectionPromise; std::future> connectionFuture = connectionPromise.get_future(); std::thread([bootstrapAddress](std::promise> connectionPromise, const ConnectionOptions options) { try { BootstrapConnection *connection = new BootstrapConnection(bootstrapAddress, options); connectionPromise.set_value(std::unique_ptr(connection)); } catch(...) { connectionPromise.set_exception(std::current_exception()); } }, std::move(connectionPromise), options).detach(); return connectionFuture; } BootstrapConnection::~BootstrapConnection() { alive = false; while(putThreadCount > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } // TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key void BootstrapConnection::receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { if(messageType == MessageType::DATA) { receiveDataFromPeer(peer, messageType, data, size); return; } else if(messageType != MessageType::SUBSCRIBE) { Log::warn("BootstrapConnection::receiveDataFromServer: received message from server that was not subscribe or data"); return; } //Log::debug("BootstrapConnection::receiveDataFromServer: Received subscriber(s) from bootstrap node"); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); // we want lock to live this whole scope so we dont connect to peer when cancelListen is called { std::lock_guard lock(subscribeDataMutex); auto subscribeDataIt = subscribeData.find(pubsubKey); if(subscribeDataIt == subscribeData.end()) { Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } } while(!deserializer.empty()) { sa_family_t addressFamily = deserializer.extract(); if(addressFamily == AF_INET) { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); Ipv4 newPeerAddress(addressFamily, ipv4Address, port); //Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { std::lock_guard lock(subscribeDataMutex); auto subscribeDataIt = subscribeData.find(pubsubKey); if(subscribeDataIt == subscribeData.end()) { Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } for(auto existingPeer : subscribeDataIt->second.peers) { if(peer == existingPeer) { Log::debug("BootstrapConnection::receiveDataFromServer: Already connected to peer (ip: %s, port: %d) for pubsub key %s", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); return; } } subscribeDataIt->second.peers.push_back(peer); if(result == PubSubResult::RESULT_OK) Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); else Log::warn("BootstrapConnection::receiveDataFromServer: Failed to connect to peer given by bootstrap node, error: %s. Routing data through bootstrap node", resultStr.c_str()); }, std::bind(&BootstrapConnection::receiveDataFromPeer, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); } else { Log::error("BootstrapConnection::receiveDataFromServer: Unknown address family: %d", addressFamily); return; } } } void BootstrapConnection::receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { sibs::SafeDeserializer deserializer((const u8*)data, size); Ipv4 targetAddress; if(peer == serverPeer) { u16 addressType = deserializer.extract(); u32 address = deserializer.extract(); u16 port = deserializer.extract(); targetAddress = Ipv4(addressType, address, port); } PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); if(messageType == MessageType::DATA) { subscribeDataMutex.lock(); auto listenerFuncIt = subscribeData.find(pubsubKey); if(listenerFuncIt == subscribeData.end()) { subscribeDataMutex.unlock(); Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); return; } auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc; if(peer == serverPeer) { for(auto pubsubPeer : listenerFuncIt->second.peers) { if(pubsubPeer->address == targetAddress) { peer = pubsubPeer; Log::debug("BootstrapConnection::receiveDataFromPeer: Received routed data from server, originator peer: ip: %s, port: %d", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); break; } } if(peer == serverPeer) { Log::warn("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); subscribeDataMutex.unlock(); return; } } subscribeDataMutex.unlock(); if(listenCallbackFunc) { bool continueListening = listenCallbackFunc(peer.get(), deserializer.getBuffer(), deserializer.getSize()); if(!continueListening) cancelListen({ pubsubKey }); } } else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); std::lock_guard subscribersMutex(subscribeDataMutex); auto peersListIt = subscribeData.find(pubsubKey); if(peersListIt == subscribeData.end()) return; for(auto it = peersListIt->second.peers.begin(); it != peersListIt->second.peers.end(); ++it) { auto existingPeer = *it; if(*existingPeer == *peer) { peersListIt->second.peers.erase(it); --peer->sharedKeys; if(peer->sharedKeys <= 0) connections.removePeer(peer->socket->udtSocket); break; } } } else { Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe"); } } ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc) { { std::lock_guard lock(subscribeDataMutex); auto it = subscribeData.find(pubsubKey); if(it != subscribeData.end()) { Log::warn("BootstrapConnection::listen called on existing listener, overwriting callback function"); if(registerCallbackFunc) it->second.listenCallbackFunc = callbackFunc; return { pubsubKey }; } SubscribeData data; if(registerCallbackFunc) data.listenCallbackFunc = callbackFunc; data.listenStartTimeMs = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); subscribeData[pubsubKey] = data; } auto message = std::make_shared(MessageType::SUBSCRIBE); message->append(pubsubKey.data.data(), pubsubKey.data.size()); Log::debug("BootstrapConnection::listen: starting to listen to %s", pubsubKey.toString().c_str()); connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str()); }); return { pubsubKey }; } ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc) { return listen(pubsubKey, callbackFunc, true); } bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size) { if(size > 819200) // 800kb { Log::error("No data was sent because you are trying to send more than 800kb"); return false; } { std::lock_guard lock(subscribeDataMutex); auto listenCallbackFuncIt = subscribeData.find(pubsubKey); if(listenCallbackFuncIt != subscribeData.end() && listenCallbackFuncIt->second.listenCallbackFunc) listenCallbackFuncIt->second.listenCallbackFunc(nullptr, data, size); if(listenCallbackFuncIt == subscribeData.end()) { Log::warn("BootstrapConnection::put on key '%s' which we are not listening to, automatically listening now", pubsubKey.toString().c_str()); listen(pubsubKey, nullptr, false); } } std::vector containedData((unsigned char*)data, (unsigned char*)data + size); auto message = std::make_shared(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); message->append(containedData.data(), containedData.size()); std::thread([this, pubsubKey, message](std::vector containedData) { ++putThreadCount; std::unordered_set peersMessaged; std::vector> peersToMessage; i64 listenStartTimeMs = 0; while(alive) { { std::lock_guard lock(subscribeDataMutex); auto subscribeIt = subscribeData.find(pubsubKey); if(subscribeIt == subscribeData.end()) { Log::warn("BootstrapConnection::put: listen cancelled for '%s', stopping put attempts", pubsubKey.toString().c_str()); break; } listenStartTimeMs = subscribeIt->second.listenStartTimeMs; for(auto &peer : subscribeIt->second.peers) { if(peersMessaged.find(peer->socket->udtSocket) == peersMessaged.end()) { peersToMessage.push_back(peer); peersMessaged.insert(peer->socket->udtSocket); } } } if(!peersToMessage.empty()) { for(auto &peer : peersToMessage) { if(peer->routed) { auto routedMessage = std::make_shared(MessageType::DATA); routedMessage->serializer.add((u16)peer->address.address.sin_family); routedMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr); routedMessage->serializer.add((u16)peer->address.address.sin_port); routedMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); routedMessage->append(containedData.data(), containedData.size()); connections.send(serverPeer, routedMessage); } else { connections.send(peer, message); } } peersToMessage.clear(); } const i64 half_min_ms = 1000 * 30; i64 now = chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count(); if(now - listenStartTimeMs > half_min_ms) break; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } --putThreadCount; }, std::move(containedData)).detach(); return true; } bool BootstrapConnection::cancelListen(const ListenHandle &listener) { std::vector> peersToMessage; { std::lock_guard lock(subscribeDataMutex); auto it = subscribeData.find(listener.key); if(it == subscribeData.end()) return false; peersToMessage = it->second.peers; } auto message = std::make_shared(MessageType::UNSUBSCRIBE); message->append(listener.key.data.data(), listener.key.data.size()); connections.send(serverPeer, message); // TODO: Unsubscribe to peers we are connected to indirectly (routed data) for(auto &peer : peersToMessage) { --peer->sharedKeys; if(peer->sharedKeys <= 0) { // disconnect from peer connections.removePeer(peer->socket->udtSocket); } else { // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore connections.send(peer, message); } } { std::lock_guard lock(subscribeDataMutex); auto it = subscribeData.find(listener.key); if(it != subscribeData.end()) subscribeData.erase(it); } return true; } bool BootstrapConnection::areWeListeningOnKey(const PubsubKey &pubsubKey) { std::lock_guard lock(subscribeDataMutex); return subscribeData.find(pubsubKey) != subscribeData.end(); } std::vector> BootstrapConnection::getPeers() { return connections.getPeers(); } }