aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-12-19 23:04:50 +0100
committerdec05eba <dec05eba@protonmail.com>2020-08-18 22:56:48 +0200
commitcc01e19e8f3a8bbb8db7d3103fcec16854b11626 (patch)
tree187824e438b5ab49b5d58308abff57686ed0bacb
parent39f3e2049813d2556309b4ee2064287b5781415f (diff)
Add option to force route data
-rw-r--r--README.md3
-rw-r--r--include/sibs/BootstrapConnection.hpp4
-rw-r--r--include/sibs/DirectConnection.hpp8
-rw-r--r--src/BootstrapConnection.cpp11
-rw-r--r--src/DirectConnection.cpp5
-rw-r--r--tests/main.cpp70
6 files changed, 90 insertions, 11 deletions
diff --git a/README.md b/README.md
index 3dd0713..28ba522 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,5 @@
# sibs pubsub
Publish/subscription using UDT for connection/data transfer
+
+# TODO
+When routing data and remote peer disconnects, perform unsubscribe through route (bootstrap node) \ No newline at end of file
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp
index 1c5c1a5..c63256a 100644
--- a/include/sibs/BootstrapConnection.hpp
+++ b/include/sibs/BootstrapConnection.hpp
@@ -42,7 +42,7 @@ namespace sibs
DISABLE_COPY(BootstrapConnection)
public:
// Throws BootstrapConnectionException on error
- static std::future<std::unique_ptr<BootstrapConnection>> connect(const Ipv4 &bootstrapAddress);
+ static std::future<std::unique_ptr<BootstrapConnection>> connect(const Ipv4 &bootstrapAddress, const ConnectionOptions &options = ConnectionOptions());
~BootstrapConnection();
// If we are already listening on the key @pubsubKey then the callback function is overwritten
@@ -57,7 +57,7 @@ namespace sibs
std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers();
private:
// Throws BootstrapConnectionException on error
- BootstrapConnection(const Ipv4 &bootstrapAddress);
+ BootstrapConnection(const Ipv4 &bootstrapAddress, const ConnectionOptions &options);
ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc);
void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size);
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index 1ba8659..50fefb8 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -62,13 +62,18 @@ namespace sibs
bool operator == (const DirectConnectionPeer &other) const;
bool operator != (const DirectConnectionPeer &other) const;
};
+
+ struct ConnectionOptions
+ {
+ bool useP2p = true;
+ };
class DirectConnections
{
DISABLE_COPY(DirectConnections)
friend class BootstrapNode;
public:
- DirectConnections(u16 port = 0);
+ DirectConnections(u16 port = 0, const ConnectionOptions &options = ConnectionOptions());
~DirectConnections();
// Throws ConnectionException on error
@@ -103,6 +108,7 @@ namespace sibs
PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback;
Ipv4Map<std::shared_future<PubSubConnectResult>> connectionResults;
std::mutex connectionResultsMutex;
+ ConnectionOptions options;
struct UPNPUrlData
{
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index df0a947..fc2e4ab 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -8,7 +8,8 @@ namespace chrono = std::chrono;
namespace sibs
{
- BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) :
+ BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress, const ConnectionOptions &options) :
+ connections(0, options),
alive(true),
putThreadCount(0)
{
@@ -54,22 +55,22 @@ namespace sibs
}
}
- std::future<std::unique_ptr<BootstrapConnection>> BootstrapConnection::connect(const Ipv4 &bootstrapAddress)
+ std::future<std::unique_ptr<BootstrapConnection>> BootstrapConnection::connect(const Ipv4 &bootstrapAddress, const ConnectionOptions &options)
{
std::promise<std::unique_ptr<BootstrapConnection>> connectionPromise;
std::future<std::unique_ptr<BootstrapConnection>> connectionFuture = connectionPromise.get_future();
- std::thread([bootstrapAddress](std::promise<std::unique_ptr<BootstrapConnection>> connectionPromise)
+ std::thread([bootstrapAddress](std::promise<std::unique_ptr<BootstrapConnection>> connectionPromise, const ConnectionOptions options)
{
try
{
- BootstrapConnection *connection = new BootstrapConnection(bootstrapAddress);
+ BootstrapConnection *connection = new BootstrapConnection(bootstrapAddress, options);
connectionPromise.set_value(std::unique_ptr<BootstrapConnection>(connection));
}
catch(...)
{
connectionPromise.set_exception(std::current_exception());
}
- }, std::move(connectionPromise)).detach();
+ }, std::move(connectionPromise), options).detach();
return connectionFuture;
}
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index ab58a48..6869d05 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -41,10 +41,11 @@ namespace sibs
return !(*this == other);
}
- DirectConnections::DirectConnections(u16 _port) :
+ DirectConnections::DirectConnections(u16 _port, const ConnectionOptions &_options) :
port(_port == 0 ? (u16)generateRandomNumber(2000, 32000) : _port),
alive(true),
removeDisconnectedPeerCallback(nullptr),
+ options(_options),
upnpDevList(nullptr)
{
UDT::startup();
@@ -300,7 +301,7 @@ namespace sibs
int socketId = socket->udtSocket;
Log::debug("DirectConnections: Connecting to %s peer (ip: %s, port: %d, rendezvous: %s)", server ? "server" : "client", address.getAddress().c_str(), address.getPort(), rendezvous ? "yes" : "no");
- if(UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
+ if((!server && !options.useP2p) || UDT::connect(socketId, (sockaddr*)&address.address, sizeof(address.address)) == UDT::ERROR)
{
if(!server)
{
diff --git a/tests/main.cpp b/tests/main.cpp
index 22bef9d..61f95f4 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -8,7 +8,7 @@ const int PORT = 35231;
#define REQUIRE(expr) do { if(!(expr)) { fprintf(stderr, "Assert failed: %s\n", #expr); exit(1); } }while(0)
-int main()
+void testP2p()
{
const sibs::PubsubKey key("abcdefghijklmnopqrstuvxyz0123456789", 35);
const sibs::PubsubKey key2("zbcdefghcjklmn3pqrs5uvx2z0123F56789", 35);
@@ -66,6 +66,74 @@ int main()
REQUIRE(gotListen);
REQUIRE(gotListen2);
+}
+
+void testRouted()
+{
+ sibs::ConnectionOptions options;
+ options.useP2p = false;
+
+ const sibs::PubsubKey key("abcdefghijklmnopqrstuvxyz0123456789", 35);
+ const sibs::PubsubKey key2("zbcdefghcjklmn3pqrs5uvx2z0123F56789", 35);
+ sibs::BootstrapNode boostrapNode(sibs::Ipv4(nullptr, PORT));
+
+ std::unique_ptr<sibs::BootstrapConnection> connection1 = sibs::BootstrapConnection::connect(sibs::Ipv4("127.0.0.1", PORT), options).get();
+ bool gotData1 = false;
+ bool gotAsdf1 = false;
+ connection1->listen(key, [&gotData1, &gotAsdf1](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
+ {
+ if(size == 5 && strncmp((const char*)data, "hello", 5) == 0)
+ gotData1 = true;
+ if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0)
+ gotAsdf1 = true;
+ return true;
+ });
+
+ std::unique_ptr<sibs::BootstrapConnection> connection2 = sibs::BootstrapConnection::connect(sibs::Ipv4("127.0.0.1", PORT), options).get();
+ bool gotData2 = false;
+ bool gotAsdf2 = false;
+ connection2->listen(key, [&gotData2, &gotAsdf2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
+ {
+ if(size == 5 && strncmp((const char*)data, "hello", 5) == 0)
+ gotData2 = true;
+ if(size == 4 && strncmp((const char*)data, "asdf", 4) == 0)
+ gotAsdf2 = true;
+ return true;
+ });
+
+ bool gotListen = false;
+ connection1->listen(key2, [&gotListen](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
+ {
+ if(size == 14 && strncmp((const char*)data, "secondListener", 14) == 0)
+ gotListen = true;
+ return true;
+ });
+
+ bool gotListen2 = false;
+ connection2->listen(key2, [&gotListen2](const sibs::DirectConnectionPeer *peer, const void *data, const sibs::usize size)
+ {
+ if(size == 14 && strncmp((const char*)data, "secondListener", 14) == 0)
+ gotListen2 = true;
+ return true;
+ });
+
+ connection1->put(key, "hello", 5);
+ connection2->put(key, "asdf", 4);
+ connection1->put(key2, "secondListener", 14);
+ std::this_thread::sleep_for(std::chrono::seconds(6));
+
+ REQUIRE(gotData1);
+ REQUIRE(gotAsdf1);
+ REQUIRE(gotData2);
+ REQUIRE(gotAsdf2);
+
+ REQUIRE(gotListen);
+ REQUIRE(gotListen2);
+}
+int main()
+{
+ testP2p();
+ testRouted();
return 0;
}