1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
#pragma once
#include <unordered_map>
#include <functional>
#include <string>
#include <memory>
#include <thread>
#include <mutex>
#include <vector>
#include <stdexcept>
#include "IpAddress.hpp"
#include "../types.hpp"
#include "../utils.hpp"
#include "Socket.hpp"
#include "Message.hpp"
#include <future>
namespace sibs
{
class SocketCreateException : public std::runtime_error
{
public:
SocketCreateException(const std::string &errMsg) : std::runtime_error(errMsg) {}
};
enum class PubSubResult
{
RESULT_OK,
RESULT_ERROR
};
struct DirectConnectionPeer;
struct PubSubConnectResult
{
std::shared_ptr<DirectConnectionPeer> peer;
PubSubResult result;
std::string resultStr;
};
using PubSubConnectCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)>;
using PubSubReceiveDataCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)>;
using PubSubSendDataCallback = std::function<void(PubSubResult result, const std::string &resultStr)>;
using PubSubOnRemoveDisconnectedPeerCallback = std::function<void(std::shared_ptr<DirectConnectionPeer> peer)>;
enum class PeerType
{
SERVER,
CLIENT
};
struct DirectConnectionPeer
{
std::unique_ptr<Socket> socket;
Ipv4 address;
PubSubReceiveDataCallback receiveDataCallbackFunc;
int sharedKeys = 0;
PeerType type = PeerType::CLIENT;
bool routed = false;
bool operator == (const DirectConnectionPeer &other) const;
bool operator != (const DirectConnectionPeer &other) const;
};
class DirectConnections
{
DISABLE_COPY(DirectConnections)
friend class BootstrapNode;
public:
DirectConnections(u16 port = 0);
~DirectConnections();
// Throws ConnectionException on error
void connectServer(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc);
// Throws ConnectionException on error
void connect(const Ipv4 &address, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc);
// Returns false if data is larger than 800kb
bool send(const std::shared_ptr<DirectConnectionPeer> peer, std::shared_ptr<Message> data, PubSubSendDataCallback sendDataCallbackFunc = nullptr);
void onRemoveDisconnectedPeer(PubSubOnRemoveDisconnectedPeerCallback callbackFunc);
bool removePeer(int peerSocket);
std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers();
protected:
std::unique_ptr<Socket> createSocket(const Ipv4 &addressToBind, bool rendezvous, bool reuseAddr, bool bind = true);
private:
void connect(const Ipv4 &address, bool rendezvous, bool reuseAddr, PubSubConnectCallback connectCallbackFunc, PubSubReceiveDataCallback receiveDataCallbackFunc, bool bind, bool server);
void removeDisconnectedPeers();
void receiveData();
int receiveDataFromPeer(const int socket, char *output, usize *receivedTotalSize);
private:
u16 port;
int eid;
std::unordered_map<int, std::shared_ptr<DirectConnectionPeer>> peers;
std::thread receiveDataThread;
std::mutex peersMutex;
bool alive;
PubSubOnRemoveDisconnectedPeerCallback removeDisconnectedPeerCallback;
Ipv4Map<std::shared_future<PubSubConnectResult>> connectionResults;
std::mutex connectionResultsMutex;
};
struct DirectConnectionsUtils
{
static std::vector<u8> serializePeers(const std::vector<std::shared_ptr<DirectConnectionPeer>> &peers);
// Throws DeserializeException on error
static std::vector<std::shared_ptr<DirectConnectionPeer>> deserializePeers(const u8 *data, const usize size);
};
}
|