aboutsummaryrefslogtreecommitdiff
path: root/include
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-10-16 00:37:21 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-10-16 00:37:25 +0200
commiteda94456add9a65d1821302e343bef4021d2a773 (patch)
tree237e09b79c0e8b853b0892deca8b67713a8c4634 /include
parent29d93a061a8fcc36e5d7afd1dbcd0a6fefbbabaa (diff)
Reuse peer connection if subscribed to same key
Diffstat (limited to 'include')
-rw-r--r--include/FnvHash.hpp15
-rw-r--r--include/sibs/BootstrapConnection.hpp28
-rw-r--r--include/sibs/BootstrapNode.hpp5
-rw-r--r--include/sibs/DirectConnection.hpp30
-rw-r--r--include/sibs/IpAddress.hpp13
-rw-r--r--include/sibs/Message.hpp30
-rw-r--r--include/sibs/PubsubKey.hpp15
-rw-r--r--include/sibs/Socket.hpp19
8 files changed, 129 insertions, 26 deletions
diff --git a/include/FnvHash.hpp b/include/FnvHash.hpp
new file mode 100644
index 0000000..7766756
--- /dev/null
+++ b/include/FnvHash.hpp
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "types.hpp"
+
+namespace sibs
+{
+ // Source: https://stackoverflow.com/a/11414104 (public license)
+ static usize fnvHash(const unsigned char *key, int len)
+ {
+ usize h = 2166136261ULL;
+ for (int i = 0; i < len; i++)
+ h = (h * 16777619ULL) ^ key[i];
+ return h;
+ }
+} \ No newline at end of file
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp
index dd90b7d..08af775 100644
--- a/include/sibs/BootstrapConnection.hpp
+++ b/include/sibs/BootstrapConnection.hpp
@@ -19,8 +19,14 @@ namespace sibs
PubsubKeyAlreadyListeningException(const std::string &errMsg) : std::runtime_error(errMsg) {}
};
- // @peer is nullptr is data was sent by local user
- using BoostrapConnectionListenCallbackFunc = std::function<void(const DirectConnectionPeer *peer, const void *data, const usize size)>;
+ // @peer is nullptr is data was sent by local user.
+ // Return false if you want to stop listening on the key
+ using BoostrapConnectionListenCallbackFunc = std::function<bool(const DirectConnectionPeer *peer, const void *data, const usize size)>;
+
+ struct ListenHandle
+ {
+ PubsubKey key;
+ };
class BootstrapConnection
{
@@ -30,17 +36,23 @@ namespace sibs
BootstrapConnection(const Ipv4 &bootstrapAddress);
// Throws PubsubKeyAlreadyListeningException if we are already listening on the key @pubsubKey
- void listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc);
- void put(const PubsubKey &pubsubKey, std::shared_ptr<std::vector<u8>> data);
+ ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc);
+ // Returns false if data is larger than 800kb.
+ // Note: @data is copied in this function.
+ // Note: You can't put data on a pubsubkey that you are not listening on. Call @listen first.
+ bool put(const PubsubKey &pubsubKey, const void *data, const usize size);
+ bool cancelListen(const ListenHandle &listener);
+
+ std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers();
private:
- void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
- void receiveDataFromPeer(BoostrapConnectionListenCallbackFunc listenCallbackFunc, std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
+ void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size);
+ void receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size);
private:
DirectConnections connections;
std::shared_ptr<DirectConnectionPeer> serverPeer;
PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs;
PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers;
- std::mutex listenerCallbackFuncMutex;
- std::mutex subscribedPeersMutex;
+ std::recursive_mutex listenerCallbackFuncMutex;
+ std::recursive_mutex subscribedPeersMutex;
};
}
diff --git a/include/sibs/BootstrapNode.hpp b/include/sibs/BootstrapNode.hpp
index ab3d6b3..c824a84 100644
--- a/include/sibs/BootstrapNode.hpp
+++ b/include/sibs/BootstrapNode.hpp
@@ -1,5 +1,6 @@
#pragma once
+#include "Socket.hpp"
#include "DirectConnection.hpp"
#include "IpAddress.hpp"
#include "PubsubKey.hpp"
@@ -23,10 +24,10 @@ namespace sibs
~BootstrapNode();
private:
void acceptConnections();
- void peerSubscribe(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size);
+ void messageFromClient(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size);
private:
DirectConnections connections;
- int socket;
+ std::unique_ptr<Socket> socket;
std::thread acceptConnectionsThread;
PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers;
std::mutex subscribedPeersMutex;
diff --git a/include/sibs/DirectConnection.hpp b/include/sibs/DirectConnection.hpp
index 8e3865f..9be55f1 100644
--- a/include/sibs/DirectConnection.hpp
+++ b/include/sibs/DirectConnection.hpp
@@ -11,6 +11,8 @@
#include "IpAddress.hpp"
#include "../types.hpp"
#include "../utils.hpp"
+#include "Socket.hpp"
+#include "Message.hpp"
namespace sibs
{
@@ -29,15 +31,19 @@ namespace sibs
struct DirectConnectionPeer;
using PubSubConnectCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)>;
- using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, const void *data, const usize size)>;
+ using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)>;
using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>;
using PubSubOnRemoveDisconnectedPeerCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer)>;
struct DirectConnectionPeer
{
- int socket;
+ std::unique_ptr<Socket> socket;
Ipv4 address;
PubSubReceiveDataCallback receiveDataCallbackFunc;
+ int sharedKeys = 0;
+
+ bool operator == (const DirectConnectionPeer &other) const;
+ bool operator != (const DirectConnectionPeer &other) const;
};
class DirectConnections
@@ -52,18 +58,22 @@ namespace sibs
void connectServer(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc);
// Throws ConnectionException on error
void connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc);
-
- void send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<std::vector<u8>> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr);
+ // Returns false if data is larger than 800kb
+ bool send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<Message> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr);
void onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc);
+ bool removePeer(int peerSocket);
+
+ std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers();
+
+ std::shared_ptr<DirectConnectionPeer> getPeerByAddress(const Ipv4 &address) const;
protected:
- int createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true);
+ std::unique_ptr<Socket> createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true);
private:
void connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc, bool bind);
void removeDisconnectedPeers();
void receiveData();
int receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize);
- bool removePeer(int peerSocket);
private:
u16 port;
int eid;
@@ -72,5 +82,13 @@ namespace sibs
std::mutex peersMutex;
bool alive;
PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback;
+ Ipv4Map<std::shared_ptr<DirectConnectionPeer>> peerByAddressMap;
+ };
+
+ struct DirectConnectionsUtils
+ {
+ static std::vector<u8> serializePeers(const std::vector<std::shared_ptr<DirectConnectionPeer>> &peers);
+ // Throws DeserializeException on error
+ static std::vector<std::shared_ptr<DirectConnectionPeer>> deserializePeers(const u8 *data, const usize size);
};
}
diff --git a/include/sibs/IpAddress.hpp b/include/sibs/IpAddress.hpp
index c3b43c4..4403e83 100644
--- a/include/sibs/IpAddress.hpp
+++ b/include/sibs/IpAddress.hpp
@@ -2,6 +2,7 @@
#include <stdexcept>
#include <string>
+#include <unordered_map>
#ifndef WIN32
#include <arpa/inet.h>
#include <netdb.h>
@@ -32,7 +33,19 @@ namespace sibs
unsigned short getPort() const;
bool operator == (const Ipv4 &other) const;
+ bool operator != (const Ipv4 &other) const;
struct sockaddr_in address;
};
+
+ struct Ipv4Hasher
+ {
+ size_t operator()(const Ipv4 &address) const
+ {
+ return address.address.sin_addr.s_addr ^ address.address.sin_port;
+ }
+ };
+
+ template <typename ValueType>
+ using Ipv4Map = std::unordered_map<Ipv4, ValueType, Ipv4Hasher>;
}
diff --git a/include/sibs/Message.hpp b/include/sibs/Message.hpp
new file mode 100644
index 0000000..b6eb858
--- /dev/null
+++ b/include/sibs/Message.hpp
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "../types.hpp"
+#include <vector>
+
+namespace sibs
+{
+ enum class MessageType : u8
+ {
+ NONE,
+ DATA,
+ SUBSCRIBE,
+ UNSUBSCRIBE
+ };
+
+ class Message
+ {
+ public:
+ Message(MessageType messageType);
+
+ void append(const void *data, const usize size);
+
+ usize getDataSize() const { return rawData.size() - 1; }
+ usize getRawSize() const { return rawData.size(); }
+
+ const u8* data() const { return rawData.data(); }
+ private:
+ std::vector<u8> rawData;
+ };
+} \ No newline at end of file
diff --git a/include/sibs/PubsubKey.hpp b/include/sibs/PubsubKey.hpp
index d123331..f1239ca 100644
--- a/include/sibs/PubsubKey.hpp
+++ b/include/sibs/PubsubKey.hpp
@@ -1,21 +1,14 @@
#pragma once
+#include "../FnvHash.hpp"
#include "../types.hpp"
#include <array>
#include <unordered_map>
+#include <string>
namespace sibs
{
- const usize PUBSUB_KEY_LENGTH = 32;
-
- // Source: https://stackoverflow.com/a/11414104 (public license)
- static size_t fnvHash(const unsigned char *key, int len)
- {
- size_t h = 2166136261;
- for (int i = 0; i < len; i++)
- h = (h * 16777619) ^ key[i];
- return h;
- }
+ const usize PUBSUB_KEY_LENGTH = 20;
class PubsubKey
{
@@ -26,6 +19,8 @@ namespace sibs
bool operator == (const PubsubKey &other) const;
bool operator != (const PubsubKey &other) const;
+ std::string toString() const;
+
std::array<u8, PUBSUB_KEY_LENGTH> data;
};
diff --git a/include/sibs/Socket.hpp b/include/sibs/Socket.hpp
new file mode 100644
index 0000000..0bc9ec3
--- /dev/null
+++ b/include/sibs/Socket.hpp
@@ -0,0 +1,19 @@
+#pragma once
+
+namespace sibs
+{
+ class Socket
+ {
+ public:
+ Socket();
+ Socket(int udtSocket);
+ Socket(int eid, int udtSocket);
+ Socket(Socket &&other);
+ Socket(const Socket&) = delete;
+ Socket& operator = (const Socket&) = delete;
+ ~Socket();
+
+ int eid;
+ int udtSocket;
+ };
+} \ No newline at end of file