#include "../include/sibs/BootstrapNode.hpp" #include "../include/Log.hpp" #ifndef WIN32 #include #include #else #include #include #endif #include #include #include namespace sibs { BootstrapNode::BootstrapNode(const Ipv4 &address) : connections(address.getPort()), socket(connections.createSocket(address, false, true)) { if(address.getPort() == 0 || connections.port != address.getPort()) { throw SocketCreateException("BootstrapNode: Failed to bind port " + std::to_string(address.getPort())); } connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { std::lock_guard lock(subscribedPeersMutex); for(auto &topicUsers : subscribedPeers) { for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); ) { if(peer->address == (*it)->address) it = topicUsers.second.erase(it); else ++it; } } }); if(UDT::listen(socket->udtSocket, 10) == UDT::ERROR) { std::string errMsg = "UDT: Failed to listen, error: "; errMsg += UDT::getlasterror_desc(); throw BootstrapException(errMsg); } acceptConnectionsThread = std::thread(&BootstrapNode::acceptConnections, this); } BootstrapNode::~BootstrapNode() { connections.alive = false; socket.reset(); acceptConnectionsThread.join(); } void BootstrapNode::acceptConnections() { sockaddr_storage clientAddr; int addrLen = sizeof(clientAddr); while(connections.alive) { UDTSOCKET clientUdtSocket = UDT::accept(socket->udtSocket, (sockaddr*)&clientAddr, &addrLen); if(clientUdtSocket == UDT::INVALID_SOCK) { // Connection was killed because bootstrap node was taken down if(!connections.alive) return; Log::error("UDT: Failed to accept connection, error: %s", UDT::getlasterror_desc()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } auto clientSocket = std::make_unique(clientUdtSocket); char clientHost[NI_MAXHOST]; char clientService[NI_MAXSERV]; getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV); Log::debug("BootstrapNode::acceptConnections: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket); std::lock_guard lock(connections.peersMutex); UDT::epoll_add_usock(connections.eid, clientUdtSocket); clientSocket->eid = connections.eid; std::shared_ptr peer = std::make_shared(); peer->socket = std::move(clientSocket); sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr; memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address)); peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::messageFromClient, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); connections.peers[clientUdtSocket] = peer; } } void BootstrapNode::messageFromClient(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { sibs::SafeDeserializer deserializer((const u8*)data, size); if(messageType == MessageType::SUBSCRIBE) { Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto &existingPeer : peers) { if(existingPeer->address == peer->address) return; } sibs::SafeSerializer serializer; serializer.add((u16)peer->address.address.sin_family); serializer.add((u32)peer->address.address.sin_addr.s_addr); serializer.add((u16)peer->address.address.sin_port); auto newPeerMessage = std::make_shared(MessageType::SUBSCRIBE); newPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size()); auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str()); }; for(auto &existingPeer : peers) { connections.send(existingPeer, newPeerMessage, sendCallbackFunc); } sibs::SafeSerializer newPeerSerializer; for(auto &existingPeer : peers) { newPeerSerializer.add((u16)existingPeer->address.address.sin_family); newPeerSerializer.add((u32)existingPeer->address.address.sin_addr.s_addr); newPeerSerializer.add((u16)existingPeer->address.address.sin_port); } peers.push_back(peer); auto existingPeerMessage = std::make_shared(MessageType::SUBSCRIBE); existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); existingPeerMessage->append(newPeerSerializer.getBuffer().data(), newPeerSerializer.getBuffer().size()); connections.send(peer, existingPeerMessage, sendCallbackFunc); } else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto it = peers.begin(); it != peers.end(); ++it) { auto existingPeer = *it; if(existingPeer->address == peer->address) { peers.erase(it); break; } } } else if(messageType == MessageType::DATA) { Log::debug("BootstrapNode: Received peer data route from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); u16 addressType = deserializer.extract(); u32 address = deserializer.extract(); u16 port = deserializer.extract(); Ipv4 targetAddress(addressType, address, port); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) { Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str()); }; std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto &existingPeer : peers) { if(existingPeer->address == targetAddress) { auto existingPeerMessage = std::make_shared(MessageType::DATA); existingPeerMessage->serializer.add((u16)peer->address.address.sin_family); existingPeerMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr); existingPeerMessage->serializer.add((u16)peer->address.address.sin_port); existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); existingPeerMessage->append(deserializer.getBuffer(), deserializer.getSize()); connections.send(existingPeer, existingPeerMessage, sendCallbackFunc); return; } } Log::warn("BootstrapNode: Route target (ip: %s, port: %d) does not exist for pubsub key %s", targetAddress.getAddress().c_str(), targetAddress.getPort(), pubsubKey.toString().c_str()); } else { Log::warn("BootstrapNode: received message from client that was not subscribe, unsubscribe or data"); } } }