#include "../include/sibs/BootstrapNode.hpp" #include "../include/Log.hpp" #ifndef WIN32 #include #include #else #include #include #endif #include #include #include namespace sibs { BootstrapNode::BootstrapNode(const Ipv4 &address) : socket(connections.createSocket(address, false, true)) { if(UDT::listen(socket, 10) == UDT::ERROR) { std::string errMsg = "UDT: Failed to listen, error: "; errMsg += UDT::getlasterror_desc(); throw BootstrapException(errMsg); } acceptConnectionsThread = std::thread(&BootstrapNode::acceptConnections, this); } BootstrapNode::~BootstrapNode() { connections.alive = false; UDT::close(socket); acceptConnectionsThread.join(); } void BootstrapNode::acceptConnections() { sockaddr_storage clientAddr; int addrLen = sizeof(clientAddr); while(connections.alive) { UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen); if(clientSocket == UDT::INVALID_SOCK) { // Connection was killed because bootstrap node was taken down if(!connections.alive) return; Log::error("UDT: Failed to accept connection, error: %s", UDT::getlasterror_desc()); std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } char clientHost[NI_MAXHOST]; char clientService[NI_MAXSERV]; getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV); Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket); UDT::epoll_add_usock(connections.eid, clientSocket); std::shared_ptr peer = std::make_shared(); peer->socket = clientSocket; sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr; memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address)); peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); connections.peersMutex.lock(); connections.peers[clientSocket] = peer; connections.peersMutex.unlock(); } } void BootstrapNode::peerSubscribe(std::shared_ptr newPeer, const void *data, const usize size) { Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", newPeer->address.getAddress().c_str(), newPeer->address.getPort()); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); auto &peers = subscribedPeers[pubsubKey]; for(auto &peer : peers) { if(peer->address.address.sin_addr.s_addr == newPeer->address.address.sin_addr.s_addr) return; } sibs::SafeSerializer serializer; serializer.add(pubsubKey.data.data(), pubsubKey.data.size()); serializer.add((u32)newPeer->address.address.sin_family); serializer.add((u32)newPeer->address.address.sin_addr.s_addr); serializer.add((u16)newPeer->address.address.sin_port); std::shared_ptr> serializerData = std::make_shared>(std::move(serializer.getBuffer())); sibs::SafeSerializer newPeerSerializer; newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size()); for(auto &peer : peers) { connections.send(peer, serializerData); newPeerSerializer.add((u32)peer->address.address.sin_family); newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr); newPeerSerializer.add((u16)peer->address.address.sin_port); } peers.push_back(newPeer); connections.send(newPeer, std::make_shared>(std::move(newPeerSerializer.getBuffer()))); } }