From 860ba7d92731fe64186521ca442a8c7e657e97e1 Mon Sep 17 00:00:00 2001 From: Aleksi Lindeman Date: Wed, 19 Dec 2018 23:04:50 +0100 Subject: Add option to force route data --- README.md | 3 ++ include/sibs/BootstrapConnection.hpp | 4 +-- include/sibs/DirectConnection.hpp | 8 ++++- src/BootstrapConnection.cpp | 11 +++--- src/DirectConnection.cpp | 5 +-- tests/main.cpp | 70 +++++++++++++++++++++++++++++++++++- 6 files changed, 90 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 3dd0713..28ba522 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # sibs pubsub Publish/subscription using UDT for connection/data transfer + +# TODO +When routing data and remote peer disconnects, perform unsubscribe through route (bootstrap node) \ No newline at end of file diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp index 1c5c1a5..c63256a 100644 --- a/include/sibs/BootstrapConnection.hpp +++ b/include/sibs/BootstrapConnection.hpp @@ -42,7 +42,7 @@ namespace sibs DISABLE_COPY(BootstrapConnection) public: // Throws BootstrapConnectionException on error - static std::future> connect(const Ipv4 &bootstrapAddress); + static std::future> connect(const Ipv4 &bootstrapAddress, const ConnectionOptions &options = ConnectionOptions()); ~BootstrapConnection(); // If we are already listening on the key @pubsubKey then the callback function is overwritten @@ -57,7 +57,7 @@ namespace sibs std::vector> getPeers(); private: // Throws BootstrapConnectionException on error - BootstrapConnection(const Ipv4 &bootstrapAddress); + BootstrapConnection(const Ipv4 &bootstrapAddress, const ConnectionOptions &options); ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc); void receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size); diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index 1ba8659..50fefb8 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -62,13 +62,18 @@ namespace sibs bool operator == (const DirectConnectionPeer &other) const; bool operator != (const DirectConnectionPeer &other) const; }; + + struct ConnectionOptions + { + bool useP2p = true; + }; class DirectConnections { DISABLE_COPY(DirectConnections) friend class BootstrapNode; public: - DirectConnections(u16 port = 0); + DirectConnections(u16 port = 0, const ConnectionOptions &options = ConnectionOptions()); ~DirectConnections(); // Throws ConnectionException on error @@ -103,6 +108,7 @@ namespace sibs PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback; Ipv4Map> connectionResults; std::mutex connectionResultsMutex; + ConnectionOptions options; struct UPNPUrlData { diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp index df0a947..fc2e4ab 100644 --- a/src/BootstrapConnection.cpp +++ b/src/BootstrapConnection.cpp @@ -8,7 +8,8 @@ namespace chrono = std::chrono; namespace sibs { - BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) : + BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress, const ConnectionOptions &options) : + connections(0, options), alive(true), putThreadCount(0) { @@ -54,22 +55,22 @@ namespace sibs } } - std::future> BootstrapConnection::connect(const Ipv4 &bootstrapAddress) + std::future> BootstrapConnection::connect(const Ipv4 &bootstrapAddress, const ConnectionOptions &options) { std::promise> connectionPromise; std::future> connectionFuture = connectionPromise.get_future(); - std::thread([bootstrapAddress](std::promise> connectionPromise) + std::thread([bootstrapAddress](std::promise> connectionPromise, const ConnectionOptions options) { try { - BootstrapConnection *connection = new BootstrapConnection(bootstrapAddress); + BootstrapConnection *connection = new BootstrapConnection(bootstrapAddress, options); connectionPromise.set_value(std::unique_ptr(connection)); } catch(...) { connectionPromise.set_exception(std::current_exception()); } - }, std::move(connectionPromise)).detach(); + }, std::move(connectionPromise), options).detach(); return connectionFuture; } diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp index ab58a48..6869d05 100644 --- a/src/DirectConnection.cpp +++ b/src/DirectConnection.cpp @@ -41,10 +41,11 @@ namespace sibs return !(*this == other); } - DirectConnections::DirectConnections(u16 _port) : + DirectConnections::DirectConnections(u16 _port, const ConnectionOptions &_options) : port(_port == 0 ? (u16)generateRandomNumber(2000, 32000) : _port), alive(true), removeDisconnectedPeerCallback(nullptr), + options(_options), upnpDevList(nullptr) { UDT::startup(); @@ -300,7 +301,7 @@ namespace sibs int socketId = socket->udtSocket; Log::debug("DirectConnections: Connecting to %s peer (ip: %s, port: %d, rendezvous: %s)", server ? "server" : "client", address.getAddress().c_str(), address.getPort(), rendezvous ? "yes" : "no"); - if(UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) + if((!server && !options.useP2p) || UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR) { if(!server) { diff --git a/tests/main.cpp b/tests/main.cpp index 22bef9d..61f95f4 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -8,7 +8,7 @@ const int PORT = 35231; #define REQUIRE(expr) do { if(!(expr)) { fprintf(stderr, "Assert failed: %s\n", #expr); exit(1); } }while(0) -int main() +void testP2p() { const sibs::PubsubKey key("abcdefghijklmnopqrstuvxyz0123456789", 35); const sibs::PubsubKey key2("zbcdefghcjklmn3pqrs5uvx2z0123F56789", 35); @@ -66,6 +66,74 @@ int main() REQUIRE(gotListen); REQUIRE(gotListen2); +} + +void testRouted() +{ + sibs::ConnectionOptions options; + options.useP2p = false; + + const sibs::PubsubKey key("abcdefghijklmnopqrstuvxyz0123456789", 35); + const sibs::PubsubKey key2("zbcdefghcjklmn3pqrs5uvx2z0123F56789", 35); + sibs::BootstrapNode boostrapNode(sibs::Ipv4(nullptr, PORT)); + + std::unique_ptr connection1 = sibs::BootstrapConnection::connect(sibs::Ipv4("127.0.0.1", PORT), options).get(); + bool gotData1 = false; + bool gotAsdf1 = false; + connection1->listen(key, [&gotData1, &gotAsdf1](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) + { + if(size == 5 && strncmp((const char*)data, "hello", 5) == 0) + gotData1 = true; + if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0) + gotAsdf1 = true; + return true; + }); + + std::unique_ptr connection2 = sibs::BootstrapConnection::connect(sibs::Ipv4("127.0.0.1", PORT), options).get(); + bool gotData2 = false; + bool gotAsdf2 = false; + connection2->listen(key, [&gotData2, &gotAsdf2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) + { + if(size == 5 && strncmp((const char*)data, "hello", 5) == 0) + gotData2 = true; + if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0) + gotAsdf2 = true; + return true; + }); + + bool gotListen = false; + connection1->listen(key2, [&gotListen](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) + { + if(size == 14 && strncmp((const char*)data, "secondListener", 14) == 0) + gotListen = true; + return true; + }); + + bool gotListen2 = false; + connection2->listen(key2, [&gotListen2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size) + { + if(size == 14 && strncmp((const char*)data, "secondListener", 14) == 0) + gotListen2 = true; + return true; + }); + + connection1->put(key, "hello", 5); + connection2->put(key, "asdf", 4); + connection1->put(key2, "secondListener", 14); + std::this_thread::sleep_for(std::chrono::seconds(6)); + + REQUIRE(gotData1); + REQUIRE(gotAsdf1); + REQUIRE(gotData2); + REQUIRE(gotAsdf2); + + REQUIRE(gotListen); + REQUIRE(gotListen2); +} +int main() +{ + testP2p(); + testRouted(); return 0; } -- cgit v1.2.3