aboutsummaryrefslogtreecommitdiff
path: root/src/BootstrapNode.cpp
blob: 9edf008e52a4babcd7108bf729d991cb4be19d72 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include "../include/sibs/BootstrapNode.hpp"
#include "../include/Log.hpp"
#ifndef WIN32
   #include <arpa/inet.h>
   #include <netdb.h>
#else
   #include <winsock2.h>
   #include <ws2tcpip.h>
#endif
#include <udt/udt.h>
#include <sibs/SafeSerializer.hpp>
#include <sibs/SafeDeserializer.hpp>

namespace sibs
{
    BootstrapNode::BootstrapNode(const Ipv4 &address) : 
        socket(connections.createSocket(address, false, true))
    {
        if(UDT::listen(socket, 10) == UDT::ERROR)
        {
            std::string errMsg = "UDT: Failed to listen, error: ";
            errMsg += UDT::getlasterror_desc();
            throw BootstrapException(errMsg);
        }
        acceptConnectionsThread = std::thread(&BootstrapNode::acceptConnections, this);
    }
    
    BootstrapNode::~BootstrapNode()
    {
        connections.alive = false;
        UDT::close(socket);
        acceptConnectionsThread.join();
    }
    
    void BootstrapNode::acceptConnections()
    {
        sockaddr_storage clientAddr;
        int addrLen = sizeof(clientAddr);

        while(connections.alive)
        {
            UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen);
            if(clientSocket == UDT::INVALID_SOCK)
            {
                // Connection was killed because bootstrap node was taken down
                if(!connections.alive)
                    return;
                Log::error("UDT: Failed to accept connection, error: %s", UDT::getlasterror_desc());
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
                continue;
            }

            char clientHost[NI_MAXHOST];
            char clientService[NI_MAXSERV];
            getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV);
            Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket);
            
            UDT::epoll_add_usock(connections.eid, clientSocket);
            std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
            peer->socket = clientSocket;
            sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr;
            memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address));
            peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
            connections.peersMutex.lock();
            connections.peers[clientSocket] = peer;
            connections.peersMutex.unlock();
        }
    }
    
    void BootstrapNode::peerSubscribe(std::shared_ptr<DirectConnectionPeer> newPeer, const void *data, const usize size)
    {
        Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", newPeer->address.getAddress().c_str(), newPeer->address.getPort());
        sibs::SafeDeserializer deserializer((const u8*)data, size);
        PubsubKey pubsubKey;
        deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
        
        auto &peers = subscribedPeers[pubsubKey];
        for(auto &peer : peers)
        {
            if(peer->address.address.sin_addr.s_addr == newPeer->address.address.sin_addr.s_addr)
                return;
        }
        
        sibs::SafeSerializer serializer;
        serializer.add(pubsubKey.data.data(), pubsubKey.data.size());
        serializer.add((u32)newPeer->address.address.sin_family);
        serializer.add((u32)newPeer->address.address.sin_addr.s_addr);
        serializer.add((u16)newPeer->address.address.sin_port);
        std::shared_ptr<std::vector<u8>> serializerData = std::make_shared<std::vector<u8>>(std::move(serializer.getBuffer()));
        
        auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
        {
            Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str());
        };
        
        sibs::SafeSerializer newPeerSerializer;
        newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size());
        for(auto &peer : peers)
        {
            connections.send(peer, serializerData, sendCallbackFunc);
            newPeerSerializer.add((u32)peer->address.address.sin_family);
            newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr);
            newPeerSerializer.add((u16)peer->address.address.sin_port);
        }
        peers.push_back(newPeer);
        connections.send(newPeer, std::make_shared<std::vector<u8>>(std::move(newPeerSerializer.getBuffer())), sendCallbackFunc);
    }
}