diff options
author | dec05eba <dec05eba@protonmail.com> | 2018-10-16 00:37:21 +0200 |
---|---|---|
committer | dec05eba <dec05eba@protonmail.com> | 2020-08-18 22:56:48 +0200 |
commit | c47870421f189eb98fc66e912693d73fbd8477ee (patch) | |
tree | 036ead590fa17bef279de483489a880c54ef4ba1 /include/sibs | |
parent | 0c1b3db7c4d9a4bcde4160c437613b32cd4081d6 (diff) |
Reuse peer connection if subscribed to same key
Diffstat (limited to 'include/sibs')
-rw-r--r-- | include/sibs/BootstrapConnection.hpp | 28 | ||||
-rw-r--r-- | include/sibs/BootstrapNode.hpp | 5 | ||||
-rw-r--r-- | include/sibs/DirectConnection.hpp | 30 | ||||
-rw-r--r-- | include/sibs/IpAddress.hpp | 13 | ||||
-rw-r--r-- | include/sibs/Message.hpp | 30 | ||||
-rw-r--r-- | include/sibs/PubsubKey.hpp | 15 | ||||
-rw-r--r-- | include/sibs/Socket.hpp | 19 |
7 files changed, 114 insertions, 26 deletions
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp index dd90b7d..08af775 100644 --- a/include/sibs/BootstrapConnection.hpp +++ b/include/sibs/BootstrapConnection.hpp @@ -19,8 +19,14 @@ namespace sibs PubsubKeyAlreadyListeningException(const std::string &errMsg) : std::runtime_error(errMsg) {} }; - // @peer is nullptr is data was sent by local user - using BoostrapConnectionListenCallbackFunc = std::function<void(const DirectConnectionPeer *peer, const void *data, const usize size)>; + // @peer is nullptr is data was sent by local user. + // Return false if you want to stop listening on the key + using BoostrapConnectionListenCallbackFunc = std::function<bool(const DirectConnectionPeer *peer, const void *data, const usize size)>; + + struct ListenHandle + { + PubsubKey key; + }; class BootstrapConnection { @@ -30,17 +36,23 @@ namespace sibs BootstrapConnection(const Ipv4 &bootstrapAddress); // Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey - void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); - void put(const PubsubKey &pubsubKey, std::shared_ptr<std::vector<u8>> data); + ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc); + // Returns false if data is larger than 800kb. + // Note: @data is copied in this function. + // Note: You can't put data on a pubsubkey that you are not listening on. Call @listen first. + bool put(const PubsubKey &pubsubKey, const void *data, const usize size); + bool cancelListen(const ListenHandle &listener); + + std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers(); private: - void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size); - void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size); + void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size); + void receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size); private: DirectConnections connections; std::shared_ptr<DirectConnectionPeer> serverPeer; PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs; PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers; - std::mutex listenerCallbackFuncMutex; - std::mutex subscribedPeersMutex; + std::recursive_mutex listenerCallbackFuncMutex; + std::recursive_mutex subscribedPeersMutex; }; } diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp index ab3d6b3..c824a84 100644 --- a/include/sibs/BootstrapNode.hpp +++ b/include/sibs/BootstrapNode.hpp @@ -1,5 +1,6 @@ #pragma once +#include "Socket.hpp" #include "DirectConnection.hpp" #include "IpAddress.hpp" #include "PubsubKey.hpp" @@ -23,10 +24,10 @@ namespace sibs ~BootstrapNode(); private: void acceptConnections(); - void peerSubscribe(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size); + void messageFromClient(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size); private: DirectConnections connections; - int socket; + std::unique_ptr<Socket> socket; std::thread acceptConnectionsThread; PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers; std::mutex subscribedPeersMutex; diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp index 8e3865f..9be55f1 100644 --- a/include/sibs/DirectConnection.hpp +++ b/include/sibs/DirectConnection.hpp @@ -11,6 +11,8 @@ #include "IpAddress.hpp" #include "../types.hpp" #include "../utils.hpp" +#include "Socket.hpp" +#include "Message.hpp" namespace sibs { @@ -29,15 +31,19 @@ namespace sibs struct DirectConnectionPeer; using PubSubConnectCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)>; - using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)>; + using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)>; using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>; using PubSubOnRemoveDisconnectedPeerCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer)>; struct DirectConnectionPeer { - int socket; + std::unique_ptr<Socket> socket; Ipv4 address; PubSubReceiveDataCallback receiveDataCallbackFunc; + int sharedKeys = 0; + + bool operator == (const DirectConnectionPeer &other) const; + bool operator != (const DirectConnectionPeer &other) const; }; class DirectConnections @@ -52,18 +58,22 @@ namespace sibs void connectServer(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc); // Throws ConnectionException on error void connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc); - - void send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr); + // Returns false if data is larger than 800kb + bool send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<Message> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr); void onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc); + bool removePeer(int peerSocket); + + std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers(); + + std::shared_ptr<DirectConnectionPeer> getPeerByAddress(const Ipv4 &address) const; protected: - int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true); + std::unique_ptr<Socket> createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true); private: void connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc, bool bind); void removeDisconnectedPeers(); void receiveData(); int receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize); - bool removePeer(int peerSocket); private: u16 port; int eid; @@ -72,5 +82,13 @@ namespace sibs std::mutex peersMutex; bool alive; PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback; + Ipv4Map<std::shared_ptr<DirectConnectionPeer>> peerByAddressMap; + }; + + struct DirectConnectionsUtils + { + static std::vector<u8> serializePeers(const std::vector<std::shared_ptr<DirectConnectionPeer>> &peers); + // Throws DeserializeException on error + static std::vector<std::shared_ptr<DirectConnectionPeer>> deserializePeers(const u8 *data, const usize size); }; } diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp index c3b43c4..4403e83 100644 --- a/include/sibs/IpAddress.hpp +++ b/include/sibs/IpAddress.hpp @@ -2,6 +2,7 @@ #include <stdexcept> #include <string> +#include <unordered_map> #ifndef WIN32 #include <arpa/inet.h> #include <netdb.h> @@ -32,7 +33,19 @@ namespace sibs unsigned short getPort() const; bool operator == (const Ipv4 &other) const; + bool operator != (const Ipv4 &other) const; struct sockaddr_in address; }; + + struct Ipv4Hasher + { + size_t operator()(const Ipv4 &address) const + { + return address.address.sin_addr.s_addr ^ address.address.sin_port; + } + }; + + template <typename ValueType> + using Ipv4Map = std::unordered_map<Ipv4, ValueType, Ipv4Hasher>; } diff --git a/include/sibs/Message.hpp b/include/sibs/Message.hpp new file mode 100644 index 0000000..b6eb858 --- /dev/null +++ b/include/sibs/Message.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include "../types.hpp" +#include <vector> + +namespace sibs +{ + enum class MessageType : u8 + { + NONE, + DATA, + SUBSCRIBE, + UNSUBSCRIBE + }; + + class Message + { + public: + Message(MessageType messageType); + + void append(const void *data, const usize size); + + usize getDataSize() const { return rawData.size() - 1; } + usize getRawSize() const { return rawData.size(); } + + const u8* data() const { return rawData.data(); } + private: + std::vector<u8> rawData; + }; +}
\ No newline at end of file diff --git a/include/sibs/PubsubKey.hpp b/include/sibs/PubsubKey.hpp index d123331..f1239ca 100644 --- a/include/sibs/PubsubKey.hpp +++ b/include/sibs/PubsubKey.hpp @@ -1,21 +1,14 @@ #pragma once +#include "../FnvHash.hpp" #include "../types.hpp" #include <array> #include <unordered_map> +#include <string> namespace sibs { - const usize PUBSUB_KEY_LENGTH = 32; - - // Source: https://stackoverflow.com/a/11414104 (public license) - static size_t fnvHash(const unsigned char *key, int len) - { - size_t h = 2166136261; - for (int i = 0; i < len; i++) - h = (h * 16777619) ^ key[i]; - return h; - } + const usize PUBSUB_KEY_LENGTH = 20; class PubsubKey { @@ -26,6 +19,8 @@ namespace sibs bool operator == (const PubsubKey &other) const; bool operator != (const PubsubKey &other) const; + std::string toString() const; + std::array<u8, PUBSUB_KEY_LENGTH> data; }; diff --git a/include/sibs/Socket.hpp b/include/sibs/Socket.hpp new file mode 100644 index 0000000..0bc9ec3 --- /dev/null +++ b/include/sibs/Socket.hpp @@ -0,0 +1,19 @@ +#pragma once + +namespace sibs +{ + class Socket + { + public: + Socket(); + Socket(int udtSocket); + Socket(int eid, int udtSocket); + Socket(Socket &&other); + Socket(const Socket&) = delete; + Socket& operator = (const Socket&) = delete; + ~Socket(); + + int eid; + int udtSocket; + }; +}
\ No newline at end of file |