From eda94456add9a65d1821302e343bef4021d2a773 Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Tue, 16 Oct 2018 00:37:21 +0200 Subject: Reuse peer connection if subscribed to same key --- include/FnvHash.hpp | 15 +++++++++++++++ include/sibs/BootstrapConnection.hpp | 28 ++++++++++++++++++++-------- include/sibs/BootstrapNode.hpp | 5 +++-- include/sibs/DirectConnection.hpp | 30 ++++++++++++++++++++++++------ include/sibs/IpAddress.hpp | 13 +++++++++++++ include/sibs/Message.hpp | 30 ++++++++++++++++++++++++++++++ include/sibs/PubsubKey.hpp | 15 +++++---------- include/sibs/Socket.hpp | 19 +++++++++++++++++++ 8 files changed, 129 insertions(+), 26 deletions(-) create mode 100644 include/FnvHash.hpp create mode 100644 include/sibs/Message.hpp create mode 100644 include/sibs/Socket.hpp (limited to 'include') diff --git a/include/FnvHash.hpp b/include/FnvHash.hpp new file mode 100644 index 0000000..7766756 --- /dev/null +++ b/include/FnvHash.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "types.hpp" + +namespace sibs +{ + // Source: https://stackoverflow.com/a/11414104 (public license) + static usize fnvHash(const unsigned char *key, int len) + { + usize h = 2166136261ULL; + for (int i = 0; i < len; i++) + h = (h * 16777619ULL) ^ key[i]; + return h; + } +} \ No newline at end of file diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp index dd90b7d..08af775 100644 --- a/include/sibs/BootstrapConnection.hpp +++ b/include/sibs/BootstrapConnection.hpp @@ -19,8 +19,14 @@ namespace sibs PubsubKeyAlreadyListeningException(const std::string &errMsg) : std::runtime_error(errMsg) {} }; - // @peer is nullptr is data was sent by local user - using BoostrapConnectionListenCallbackFunc = std::function; + // @peer is nullptr is data was sent by local user. + // Return false if you want to stop listening on the key + using BoostrapConnectionListenCallbackFunc = std::function; + + struct ListenHandle + { + PubsubKey key; + }; class BootstrapConnection { @@ -30,17 +36,23 @@ namespace sibs BootstrapConnection(const Ipv4 &bootstrapAddress); // Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey - void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); - void put(const PubsubKey &pubsubKey, std::shared_ptr> data); + ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); + // Returns false if data is larger than 800kb. + // Note: @data is copied in this function. + // Note: You can't put data on a pubsubkey that you are not listening on. Call @listen first. + bool put(const PubsubKey &pubsubKey, const void *data, const usize size); + bool cancelListen(const ListenHandle &listener); + + std::vector> getPeers(); private: - void receiveDataFromServer(std::shared_ptr peer, const void *data, const usize size); - void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr peer, const void *data, const usize size); + void receiveDataFromServer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size); + void receiveDataFromPeer(std::shared_ptr peer, MessageType messageType, const void *data, const usize size); private: DirectConnections connections; std::shared_ptr serverPeer; PubsubKeyMap listenCallbackFuncs; PubsubKeyMap>> subscribedPeers; - std::mutex listenerCallbackFuncMutex; - std::mutex subscribedPeersMutex; + std::recursive_mutex listenerCallbackFuncMutex; + std::recursive_mutex subscribedPeersMutex; }; } diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp index ab3d6b3..c824a84 100644 --- a/include/sibs/BootstrapNode.hpp +++ b/include/sibs/BootstrapNode.hpp @@ -1,5 +1,6 @@ #pragma once +#include "Socket.hpp" #include "DirectConnection.hpp" #include "IpAddress.hpp" #include "PubsubKey.hpp" @@ -23,10 +24,10 @@ namespace sibs ~BootstrapNode(); private: void acceptConnections(); - void peerSubscribe(std::shared_ptr peer, const void *data, const usize size); + void messageFromClient(std::shared_ptr peer, MessageType messageType, const void *data, const usize size); private: DirectConnections connections; - int socket; + std::unique_ptr socket; std::thread acceptConnectionsThread; PubsubKeyMap>> subscribedPeers; std::mutex subscribedPeersMutex; diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index 8e3865f..9be55f1 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -11,6 +11,8 @@ #include "IpAddress.hpp" #include "../types.hpp" #include "../utils.hpp" +#include "Socket.hpp" +#include "Message.hpp" namespace sibs { @@ -29,15 +31,19 @@ namespace sibs struct DirectConnectionPeer; using PubSubConnectCallback = std::function peer, PubSubResult result, const std::string &resultStr)>; - using PubSubReceiveDataCallback = std::function peer, const void *data, const usize size)>; + using PubSubReceiveDataCallback = std::function peer, MessageType messageType, const void *data, const usize size)>; using PubSubSendDataCallback = std::function; using PubSubOnRemoveDisconnectedPeerCallback = std::function peer)>; struct DirectConnectionPeer { - int socket; + std::unique_ptr socket; Ipv4 address; PubSubReceiveDataCallback receiveDataCallbackFunc; + int sharedKeys = 0; + + bool operator == (const DirectConnectionPeer &other) const; + bool operator != (const DirectConnectionPeer &other) const; }; class DirectConnections @@ -52,18 +58,22 @@ namespace sibs void connectServer(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc); // Throws ConnectionException on error void connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc); - - void send(const std::shared_ptr peer, std::shared_ptr> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr); + // Returns false if data is larger than 800kb + bool send(const std::shared_ptr peer, std::shared_ptr data, PubSubSendDataCallback sendDataCallbackFunc = nullptr); void onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc); + bool removePeer(int peerSocket); + + std::vector> getPeers(); + + std::shared_ptr getPeerByAddress(const Ipv4 &address) const; protected: - int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true); + std::unique_ptr createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true); private: void connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc, bool bind); void removeDisconnectedPeers(); void receiveData(); int receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize); - bool removePeer(int peerSocket); private: u16 port; int eid; @@ -72,5 +82,13 @@ namespace sibs std::mutex peersMutex; bool alive; PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback; + Ipv4Map> peerByAddressMap; + }; + + struct DirectConnectionsUtils + { + static std::vector serializePeers(const std::vector> &peers); + // Throws DeserializeException on error + static std::vector> deserializePeers(const u8 *data, const usize size); }; } diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp index c3b43c4..4403e83 100644 --- a/include/sibs/IpAddress.hpp +++ b/include/sibs/IpAddress.hpp @@ -2,6 +2,7 @@ #include #include +#include #ifndef WIN32 #include #include @@ -32,7 +33,19 @@ namespace sibs unsigned short getPort() const; bool operator == (const Ipv4 &other) const; + bool operator != (const Ipv4 &other) const; struct sockaddr_in address; }; + + struct Ipv4Hasher + { + size_t operator()(const Ipv4 &address) const + { + return address.address.sin_addr.s_addr ^ address.address.sin_port; + } + }; + + template + using Ipv4Map = std::unordered_map; } diff --git a/include/sibs/Message.hpp b/include/sibs/Message.hpp new file mode 100644 index 0000000..b6eb858 --- /dev/null +++ b/include/sibs/Message.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "../types.hpp" +#include + +namespace sibs +{ + enum class MessageType : u8 + { + NONE, + DATA, + SUBSCRIBE, + UNSUBSCRIBE + }; + + class Message + { + public: + Message(MessageType messageType); + + void append(const void *data, const usize size); + + usize getDataSize() const { return rawData.size() - 1; } + usize getRawSize() const { return rawData.size(); } + + const u8* data() const { return rawData.data(); } + private: + std::vector rawData; + }; +} \ No newline at end of file diff --git a/include/sibs/PubsubKey.hpp b/include/sibs/PubsubKey.hpp index d123331..f1239ca 100644 --- a/include/sibs/PubsubKey.hpp +++ b/include/sibs/PubsubKey.hpp @@ -1,21 +1,14 @@ #pragma once +#include "../FnvHash.hpp" #include "../types.hpp" #include #include +#include namespace sibs { - const usize PUBSUB_KEY_LENGTH = 32; - - // Source: https://stackoverflow.com/a/11414104 (public license) - static size_t fnvHash(const unsigned char *key, int len) - { - size_t h = 2166136261; - for (int i = 0; i < len; i++) - h = (h * 16777619) ^ key[i]; - return h; - } + const usize PUBSUB_KEY_LENGTH = 20; class PubsubKey { @@ -26,6 +19,8 @@ namespace sibs bool operator == (const PubsubKey &other) const; bool operator != (const PubsubKey &other) const; + std::string toString() const; + std::array data; }; diff --git a/include/sibs/Socket.hpp b/include/sibs/Socket.hpp new file mode 100644 index 0000000..0bc9ec3 --- /dev/null +++ b/include/sibs/Socket.hpp @@ -0,0 +1,19 @@ +#pragma once + +namespace sibs +{ + class Socket + { + public: + Socket(); + Socket(int udtSocket); + Socket(int eid, int udtSocket); + Socket(Socket &&other); + Socket(const Socket&) = delete; + Socket& operator = (const Socket&) = delete; + ~Socket(); + + int eid; + int udtSocket; + }; +} \ No newline at end of file -- cgit v1.2.3