From eda94456add9a65d1821302e343bef4021d2a773 Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Tue, 16 Oct 2018 00:37:21 +0200 Subject: Reuse peer connection if subscribed to same key --- src/BootstrapNode.cpp | 117 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 76 insertions(+), 41 deletions(-) (limited to 'src/BootstrapNode.cpp') diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index 273abf0..5c62e64 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -14,9 +14,14 @@ namespace sibs { BootstrapNode::BootstrapNode(const Ipv4 &address) : - connections(27130), + connections(address.getPort()), socket(connections.createSocket(address, false, true)) { + if(connections.port != address.getPort()) + { + throw SocketCreateException("BootstrapNode: Failed to bind port " + std::to_string(address.getPort())); + } + connections.onRemoveDisconnectedPeer([this](std::shared_ptr peer) { std::lock_guard lock(subscribedPeersMutex); @@ -32,7 +37,7 @@ namespace sibs } }); - if(UDT::listen(socket, 10) == UDT::ERROR) + if(UDT::listen(socket->udtSocket, 10) == UDT::ERROR) { std::string errMsg = "UDT: Failed to listen, error: "; errMsg += UDT::getlasterror_desc(); @@ -43,8 +48,9 @@ namespace sibs BootstrapNode::~BootstrapNode() { + std::lock_guard lock(connections.peersMutex); connections.alive = false; - UDT::close(socket); + socket.reset(); acceptConnectionsThread.join(); } @@ -55,8 +61,8 @@ namespace sibs while(connections.alive) { - UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen); - if(clientSocket == UDT::INVALID_SOCK) + UDTSOCKET clientUdtSocket = UDT::accept(socket->udtSocket, (sockaddr*)&clientAddr, &addrLen); + if(clientUdtSocket == UDT::INVALID_SOCK) { // Connection was killed because bootstrap node was taken down if(!connections.alive) @@ -65,61 +71,90 @@ namespace sibs std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } + auto clientSocket = std::make_unique(clientUdtSocket); char clientHost[NI_MAXHOST]; char clientService[NI_MAXSERV]; getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV); - Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket); + Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket); - UDT::epoll_add_usock(connections.eid, clientSocket); + std::lock_guard lock(connections.peersMutex); + UDT::epoll_add_usock(connections.eid, clientUdtSocket); + clientSocket->eid = connections.eid; std::shared_ptr peer = std::make_shared(); - peer->socket = clientSocket; + peer->socket = std::move(clientSocket); sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr; memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address)); - peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); - connections.peersMutex.lock(); - connections.peers[clientSocket] = peer; - connections.peersMutex.unlock(); + peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::messageFromClient, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); + connections.peers[clientUdtSocket] = peer; } } - void BootstrapNode::peerSubscribe(std::shared_ptr newPeer, const void *data, const usize size) + void BootstrapNode::messageFromClient(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { - Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", newPeer->address.getAddress().c_str(), newPeer->address.getPort()); sibs::SafeDeserializer deserializer((const u8*)data, size); PubsubKey pubsubKey; deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH); - - std::lock_guard lock(subscribedPeersMutex); - auto &peers = subscribedPeers[pubsubKey]; - for(auto &peer : peers) + + if(messageType == MessageType::SUBSCRIBE) { - if(peer->address == newPeer->address) - return; + Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + std::lock_guard lock(subscribedPeersMutex); + auto &peers = subscribedPeers[pubsubKey]; + for(auto &existingPeer : peers) + { + if(existingPeer->address == peer->address) + return; + } + + sibs::SafeSerializer serializer; + serializer.add((u32)peer->address.address.sin_family); + serializer.add((u32)peer->address.address.sin_addr.s_addr); + serializer.add((u16)peer->address.address.sin_port); + auto newPeerMessage = std::make_shared(MessageType::SUBSCRIBE); + newPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); + newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size()); + auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) + { + Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str()); + }; + for(auto &existingPeer : peers) + { + connections.send(existingPeer, newPeerMessage, sendCallbackFunc); + } + + sibs::SafeSerializer newPeerSerializer; + for(auto &existingPeer : peers) + { + newPeerSerializer.add((u32)existingPeer->address.address.sin_family); + newPeerSerializer.add((u32)existingPeer->address.address.sin_addr.s_addr); + newPeerSerializer.add((u16)existingPeer->address.address.sin_port); + } + peers.push_back(peer); + + auto existingPeerMessage = std::make_shared(MessageType::SUBSCRIBE); + existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); + existingPeerMessage->append(newPeerSerializer.getBuffer().data(), newPeerSerializer.getBuffer().size()); + connections.send(peer, existingPeerMessage, sendCallbackFunc); } - - sibs::SafeSerializer serializer; - serializer.add(pubsubKey.data.data(), pubsubKey.data.size()); - serializer.add((u32)newPeer->address.address.sin_family); - serializer.add((u32)newPeer->address.address.sin_addr.s_addr); - serializer.add((u16)newPeer->address.address.sin_port); - std::shared_ptr> serializerData = std::make_shared>(std::move(serializer.getBuffer())); - - auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) + else if(messageType == MessageType::UNSUBSCRIBE) { - Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str()); - }; - - sibs::SafeSerializer newPeerSerializer; - newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size()); - for(auto &peer : peers) + Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + std::lock_guard lock(subscribedPeersMutex); + auto &peers = subscribedPeers[pubsubKey]; + for(auto it = peers.begin(); it != peers.end(); ++it) + { + auto existingPeer = *it; + if(existingPeer->address == peer->address) + { + peers.erase(it); + break; + } + } + } + else { - connections.send(peer, serializerData, sendCallbackFunc); - newPeerSerializer.add((u32)peer->address.address.sin_family); - newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr); - newPeerSerializer.add((u16)peer->address.address.sin_port); + Log::warn("BootstrapNode: received message from client that was not subscribe or unsubscribe"); } - peers.push_back(newPeer); - connections.send(newPeer, std::make_shared>(std::move(newPeerSerializer.getBuffer())), sendCallbackFunc); } } -- cgit v1.2.3