1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
#include "../include/sibs/BootstrapNode.hpp"
#include "../include/Log.hpp"
#ifndef WIN32
#include <arpa/inet.h>
#include <netdb.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#endif
#include <udt/udt.h>
#include <sibs/SafeSerializer.hpp>
#include <sibs/SafeDeserializer.hpp>
namespace sibs
{
BootstrapNode::BootstrapNode(const Ipv4 &address) :
connections(27130),
socket(connections.createSocket(address, false, true))
{
if(UDT::listen(socket, 10) == UDT::ERROR)
{
std::string errMsg = "UDT: Failed to listen, error: ";
errMsg += UDT::getlasterror_desc();
throw BootstrapException(errMsg);
}
acceptConnectionsThread = std::thread(&BootstrapNode::acceptConnections, this);
}
BootstrapNode::~BootstrapNode()
{
connections.alive = false;
UDT::close(socket);
acceptConnectionsThread.join();
}
void BootstrapNode::acceptConnections()
{
sockaddr_storage clientAddr;
int addrLen = sizeof(clientAddr);
while(connections.alive)
{
UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen);
if(clientSocket == UDT::INVALID_SOCK)
{
// Connection was killed because bootstrap node was taken down
if(!connections.alive)
return;
Log::error("UDT: Failed to accept connection, error: %s", UDT::getlasterror_desc());
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
char clientHost[NI_MAXHOST];
char clientService[NI_MAXSERV];
getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV);
Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket);
UDT::epoll_add_usock(connections.eid, clientSocket);
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
peer->socket = clientSocket;
sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr;
memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address));
peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
connections.peersMutex.lock();
connections.peers[clientSocket] = peer;
connections.peersMutex.unlock();
}
}
void BootstrapNode::peerSubscribe(std::shared_ptr<DirectConnectionPeer> newPeer, const void *data, const usize size)
{
Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", newPeer->address.getAddress().c_str(), newPeer->address.getPort());
sibs::SafeDeserializer deserializer((const u8*)data, size);
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
auto &peers = subscribedPeers[pubsubKey];
for(auto &peer : peers)
{
if(peer->address == newPeer->address)
return;
}
sibs::SafeSerializer serializer;
serializer.add(pubsubKey.data.data(), pubsubKey.data.size());
serializer.add((u32)newPeer->address.address.sin_family);
serializer.add((u32)newPeer->address.address.sin_addr.s_addr);
serializer.add((u16)newPeer->address.address.sin_port);
std::shared_ptr<std::vector<u8>> serializerData = std::make_shared<std::vector<u8>>(std::move(serializer.getBuffer()));
auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
{
Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str());
};
sibs::SafeSerializer newPeerSerializer;
newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size());
for(auto &peer : peers)
{
connections.send(peer, serializerData, sendCallbackFunc);
newPeerSerializer.add((u32)peer->address.address.sin_family);
newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr);
newPeerSerializer.add((u16)peer->address.address.sin_port);
}
peers.push_back(newPeer);
connections.send(newPeer, std::make_shared<std::vector<u8>>(std::move(newPeerSerializer.getBuffer())), sendCallbackFunc);
}
}
|