From 81b59a08b69d4872b06930aedd06a957c4acb7cc Mon Sep 17 00:00:00 2001 From: Aleksi Lindeman Date: Fri, 23 Nov 2018 15:44:08 +0100 Subject: Add data routing for failed p2p connections --- include/sibs/DirectConnection.hpp | 1 + include/sibs/IpAddress.hpp | 2 ++ include/sibs/Message.hpp | 12 +++---- src/BootstrapConnection.cpp | 66 ++++++++++++++++++++++++++++++--------- src/BootstrapNode.cpp | 47 +++++++++++++++++++++++++--- src/DirectConnection.cpp | 48 +++++++++++++++++++++------- src/IpAddress.cpp | 8 +++++ src/Message.cpp | 4 +-- 8 files changed, 149 insertions(+), 39 deletions(-) diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index eb44153..70eaf4f 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -56,6 +56,7 @@ namespace sibs PubSubReceiveDataCallback receiveDataCallbackFunc; int sharedKeys = 0; PeerType type = PeerType::CLIENT; + bool routed = false; bool operator == (const DirectConnectionPeer &other) const; bool operator != (const DirectConnectionPeer &other) const; diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp index 54cb58a..ea2a7bd 100644 --- a/include/sibs/IpAddress.hpp +++ b/include/sibs/IpAddress.hpp @@ -4,6 +4,7 @@ #include #include #include "../env.hpp" +#include "../types.hpp" #if OS_FAMILY != OS_FAMILY_WINDOWS #include #include @@ -32,6 +33,7 @@ namespace sibs // If @ip is nullptr, then bind to all available sockets (typical for servers) // Throws InvalidAddressException on error. Ipv4(const char *ip, unsigned short port); + Ipv4(u16 family, u32 address, u16 port); Ipv4(const Ipv4 &other); Ipv4& operator = (const Ipv4 &other); diff --git a/include/sibs/Message.hpp b/include/sibs/Message.hpp index b6eb858..4591221 100644 --- a/include/sibs/Message.hpp +++ b/include/sibs/Message.hpp @@ -1,7 +1,7 @@ #pragma once #include "../types.hpp" -#include +#include namespace sibs { @@ -20,11 +20,11 @@ namespace sibs void append(const void *data, const usize size); - usize getDataSize() const { return rawData.size() - 1; } - usize getRawSize() const { return rawData.size(); } + usize getDataSize() const { return serializer.getSize() - 1; } + usize getRawSize() const { return serializer.getSize(); } - const u8* data() const { return rawData.data(); } - private: - std::vector rawData; + const u8* data() { return serializer.getBuffer().data(); } + + sibs::SafeSerializer serializer; }; } \ No newline at end of file diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index 4ace97c..d914646 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -114,11 +114,7 @@ namespace sibs { in_addr_t ipv4Address = deserializer.extract(); u16 port = deserializer.extract(); - Ipv4 newPeerAddress; - newPeerAddress.address.sin_family = addressFamily; - newPeerAddress.address.sin_addr.s_addr = ipv4Address; - newPeerAddress.address.sin_port = port; - memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero)); + Ipv4 newPeerAddress(addressFamily, ipv4Address, port); Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort()); connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr peer, PubSubResult result, const std::string &resultStr) { @@ -132,7 +128,6 @@ namespace sibs return; } subscribeDataIt->second.peers.push_back(peer); - ++peer->sharedKeys; Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort()); } else @@ -149,11 +144,18 @@ namespace sibs void BootstrapConnection::receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { - if(size < PUBSUB_KEY_LENGTH) - return; + sibs::SafeDeserializer deserializer((const u8*)data, size); + Ipv4 targetAddress; + if(peer == serverPeer) + { + u16 addressType = deserializer.extract(); + u32 address = deserializer.extract(); + u16 port = deserializer.extract(); + targetAddress = Ipv4(addressType, address, port); + } PubsubKey pubsubKey; - memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH); + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); if(messageType == MessageType::DATA) { subscribeDataMutex.lock(); @@ -165,11 +167,30 @@ namespace sibs return; } auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc; + if(peer == serverPeer) + { + for(auto pubsubPeer : listenerFuncIt->second.peers) + { + if(pubsubPeer->address == targetAddress) + { + peer = pubsubPeer; + Log::debug("BootstrapConnection::receiveDataFromPeer: Received routed data from server, originator peer: ip: %s, port: %d", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str()); + break; + } + } + + if(peer == serverPeer) + { + Log::warn("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str()); + subscribeDataMutex.unlock(); + return; + } + } subscribeDataMutex.unlock(); if(listenCallbackFunc) { - bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH); + bool continueListening = listenCallbackFunc(peer.get(), deserializer.getBuffer(), deserializer.getSize()); if(!continueListening) cancelListen({ pubsubKey }); } @@ -257,11 +278,12 @@ namespace sibs listen(pubsubKey, nullptr, false); } } - + + std::vector containedData((unsigned char*)data, (unsigned char*)data + size); auto message = std::make_shared(MessageType::DATA); message->append(pubsubKey.data.data(), pubsubKey.data.size()); - message->append(data, size); - std::thread([this, pubsubKey, message]() + message->append(containedData.data(), containedData.size()); + std::thread([this, pubsubKey, message](std::vector containedData) { ++putThreadCount; std::unordered_set peersMessaged; @@ -294,7 +316,20 @@ namespace sibs { for(auto &peer : peersToMessage) { - connections.send(peer, message); + if(peer->routed) + { + auto routedMessage = std::make_shared(MessageType::DATA); + routedMessage->serializer.add((u16)peer->address.address.sin_family); + routedMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr); + routedMessage->serializer.add((u16)peer->address.address.sin_port); + routedMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); + routedMessage->append(containedData.data(), containedData.size()); + connections.send(serverPeer, routedMessage); + } + else + { + connections.send(peer, message); + } } peersToMessage.clear(); } @@ -307,7 +342,7 @@ namespace sibs std::this_thread::sleep_for(std::chrono::milliseconds(200)); } --putThreadCount; - }).detach(); + }, std::move(containedData)).detach(); return true; } @@ -327,6 +362,7 @@ namespace sibs message->append(listener.key.data.data(), listener.key.data.size()); connections.send(serverPeer, message); + // TODO: Unsubscribe to peers we are connected to indirectly (routed data) for(auto &peer : peersToMessage) { --peer->sharedKeys; diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp index f6fc0c3..3def1f7 100644 --- a/src/BootstrapNode.cpp +++ b/src/BootstrapNode.cpp @@ -92,12 +92,11 @@ namespace sibs void BootstrapNode::messageFromClient(std::shared_ptr peer, MessageType messageType, const void *data, const usize size) { sibs::SafeDeserializer deserializer((const u8*)data, size); - PubsubKey pubsubKey; - deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); - if(messageType == MessageType::SUBSCRIBE) { Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto &existingPeer : peers) @@ -115,7 +114,7 @@ namespace sibs newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size()); auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) { - Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str()); + Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str()); }; for(auto &existingPeer : peers) { @@ -139,6 +138,8 @@ namespace sibs else if(messageType == MessageType::UNSUBSCRIBE) { Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); std::lock_guard lock(subscribedPeersMutex); auto &peers = subscribedPeers[pubsubKey]; for(auto it = peers.begin(); it != peers.end(); ++it) @@ -151,9 +152,45 @@ namespace sibs } } } + else if(messageType == MessageType::DATA) + { + Log::debug("BootstrapNode: Received peer data route from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort()); + u16 addressType = deserializer.extract(); + u32 address = deserializer.extract(); + u16 port = deserializer.extract(); + Ipv4 targetAddress(addressType, address, port); + PubsubKey pubsubKey; + deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size()); + auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr) + { + Log::debug("BootstrapNode::messageFromClient send result: %d, result string: %s", result, resultStr.c_str()); + }; + + std::lock_guard lock(subscribedPeersMutex); + auto &peers = subscribedPeers[pubsubKey]; + for(auto &existingPeer : peers) + { + if(existingPeer->address == targetAddress) + { + auto existingPeerMessage = std::make_shared(MessageType::DATA); + existingPeerMessage->serializer.add((u16)peer->address.address.sin_family); + existingPeerMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr); + existingPeerMessage->serializer.add((u16)peer->address.address.sin_port); + existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size()); + existingPeerMessage->append(deserializer.getBuffer(), deserializer.getSize()); + connections.send(existingPeer, existingPeerMessage, sendCallbackFunc); + return; + } + } + + Log::warn("BootstrapNode: Route target (ip: %s, port: %d) does not exist for pubsub key %s", + targetAddress.getAddress().c_str(), + targetAddress.getPort(), + pubsubKey.toString().c_str()); + } else { - Log::warn("BootstrapNode: received message from client that was not subscribe or unsubscribe"); + Log::warn("BootstrapNode: received message from client that was not subscribe, unsubscribe or data"); } } } diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index e5a997f..cb71c8f 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -156,27 +156,53 @@ namespace sibs connectCallbackFunc(peer, PubSubResult::RESULT_ERROR, e.what()); return PubSubConnectResult { peer, PubSubResult::RESULT_ERROR, e.what() }; } + + int socketId = socket->udtSocket; + if(!server) + { + UDT::epoll_add_usock(eid, socket->udtSocket); + socket->eid = eid; + peersMutex.lock(); + peers[socket->udtSocket] = peer; + peer->socket = std::move(socket); + peer->address = address; + peer->receiveDataCallbackFunc = receiveDataCallbackFunc; + peer->type = (server ? PeerType::SERVER : PeerType::CLIENT); + peer->routed = true; + peer->sharedKeys = 1; + peersMutex.unlock(); + } Log::debug("DirectConnections: Connecting to peer (ip: %s, port: %d, rendezvous: %s)", address.getAddress().c_str(), address.getPort(), rendezvous ? "yes" : "no"); - if(UDT::connect(socket->udtSocket, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) + if(UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) { + if(!server) + peers.erase(peers.find(socketId)); + if(connectCallbackFunc) connectCallbackFunc(peer, PubSubResult::RESULT_ERROR, UDT::getlasterror_desc()); + return PubSubConnectResult{ peer, PubSubResult::RESULT_ERROR, UDT::getlasterror_desc() }; } - - UDT::epoll_add_usock(eid, socket->udtSocket); - socket->eid = eid; - peersMutex.lock(); - peers[socket->udtSocket] = peer; - peer->socket = std::move(socket); - peer->address = address; - peer->receiveDataCallbackFunc = receiveDataCallbackFunc; - peer->type = (server ? PeerType::SERVER : PeerType::CLIENT); - peersMutex.unlock(); + + if(server) + { + UDT::epoll_add_usock(eid, socketId); + socket->eid = eid; + peersMutex.lock(); + peers[socketId] = peer; + peer->socket = std::move(socket); + peer->address = address; + peer->receiveDataCallbackFunc = receiveDataCallbackFunc; + peer->type = PeerType::SERVER; + peer->sharedKeys = 1; + peersMutex.unlock(); + } + peer->routed = false; if(connectCallbackFunc) connectCallbackFunc(peer, PubSubResult::RESULT_OK, ""); + return PubSubConnectResult { peer, PubSubResult::RESULT_OK, "" }; }); connectionResultsMutex.unlock(); diff --git a/src/IpAddress.cpp b/src/IpAddress.cpp index 75e4348..76f1c63 100644 --- a/src/IpAddress.cpp +++ b/src/IpAddress.cpp @@ -194,6 +194,14 @@ namespace sibs address.sin_addr.s_addr = INADDR_ANY; memset(address.sin_zero, 0, sizeof(address.sin_zero)); } + + Ipv4::Ipv4(u16 family, u32 addr, u16 port) + { + address.sin_family = family; + address.sin_addr.s_addr = addr; + address.sin_port = port; + memset(address.sin_zero, 0, sizeof(address.sin_zero)); + } Ipv4::Ipv4(const Ipv4 &other) { diff --git a/src/Message.cpp b/src/Message.cpp index 58b39da..89973d3 100644 --- a/src/Message.cpp +++ b/src/Message.cpp @@ -5,11 +5,11 @@ namespace sibs Message::Message(MessageType messageType) { static_assert(sizeof(MessageType) == sizeof(u8), "Whoops, message type size has changed, the below code doesn't work"); - rawData.push_back((u8)messageType); + serializer.add((u8)messageType); } void Message::append(const void *data, const usize size) { - rawData.insert(rawData.end(), (const u8*)data, (const u8*)data + size); + serializer.add((const u8*)data, size); } } \ No newline at end of file -- cgit v1.2.3