aboutsummaryrefslogtreecommitdiff
path: root/src/BootstrapNode.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/BootstrapNode.cpp')
-rw-r--r--src/BootstrapNode.cpp117
1 files changed, 76 insertions, 41 deletions
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index 273abf0..5c62e64 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -14,9 +14,14 @@
namespace sibs
{
BootstrapNode::BootstrapNode(const Ipv4 &address) :
- connections(27130),
+ connections(address.getPort()),
socket(connections.createSocket(address, false, true))
{
+ if(connections.port != address.getPort())
+ {
+ throw SocketCreateException("BootstrapNode: Failed to bind port " + std::to_string(address.getPort()));
+ }
+
connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer)
{
std::lock_guard<std::mutex> lock(subscribedPeersMutex);
@@ -32,7 +37,7 @@ namespace sibs
}
});
- if(UDT::listen(socket, 10) == UDT::ERROR)
+ if(UDT::listen(socket->udtSocket, 10) == UDT::ERROR)
{
std::string errMsg = "UDT: Failed to listen, error: ";
errMsg += UDT::getlasterror_desc();
@@ -43,8 +48,9 @@ namespace sibs
BootstrapNode::~BootstrapNode()
{
+ std::lock_guard<std::mutex> lock(connections.peersMutex);
connections.alive = false;
- UDT::close(socket);
+ socket.reset();
acceptConnectionsThread.join();
}
@@ -55,8 +61,8 @@ namespace sibs
while(connections.alive)
{
- UDTSOCKET clientSocket = UDT::accept(socket, (sockaddr*)&clientAddr, &addrLen);
- if(clientSocket == UDT::INVALID_SOCK)
+ UDTSOCKET clientUdtSocket = UDT::accept(socket->udtSocket, (sockaddr*)&clientAddr, &addrLen);
+ if(clientUdtSocket == UDT::INVALID_SOCK)
{
// Connection was killed because bootstrap node was taken down
if(!connections.alive)
@@ -65,61 +71,90 @@ namespace sibs
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
+ auto clientSocket = std::make_unique<Socket>(clientUdtSocket);
char clientHost[NI_MAXHOST];
char clientService[NI_MAXSERV];
getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV);
- Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientSocket);
+ Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket);
- UDT::epoll_add_usock(connections.eid, clientSocket);
+ std::lock_guard<std::mutex> lock(connections.peersMutex);
+ UDT::epoll_add_usock(connections.eid, clientUdtSocket);
+ clientSocket->eid = connections.eid;
std::shared_ptr<DirectConnectionPeer> peer = std::make_shared<DirectConnectionPeer>();
- peer->socket = clientSocket;
+ peer->socket = std::move(clientSocket);
sockaddr_in *clientAddrSock = (sockaddr_in*)&clientAddr;
memcpy(&peer->address.address, clientAddrSock, sizeof(peer->address.address));
- peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::peerSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
- connections.peersMutex.lock();
- connections.peers[clientSocket] = peer;
- connections.peersMutex.unlock();
+ peer->receiveDataCallbackFunc = std::bind(&BootstrapNode::messageFromClient, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
+ connections.peers[clientUdtSocket] = peer;
}
}
- void BootstrapNode::peerSubscribe(std::shared_ptr<DirectConnectionPeer> newPeer, const void *data, const usize size)
+ void BootstrapNode::messageFromClient(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
- Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", newPeer->address.getAddress().c_str(), newPeer->address.getPort());
sibs::SafeDeserializer deserializer((const u8*)data, size);
PubsubKey pubsubKey;
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
-
- std::lock_guard<std::mutex> lock(subscribedPeersMutex);
- auto &peers = subscribedPeers[pubsubKey];
- for(auto &peer : peers)
+
+ if(messageType == MessageType::SUBSCRIBE)
{
- if(peer->address == newPeer->address)
- return;
+ Log::debug("BootstrapNode: Received peer subscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ auto &peers = subscribedPeers[pubsubKey];
+ for(auto &existingPeer : peers)
+ {
+ if(existingPeer->address == peer->address)
+ return;
+ }
+
+ sibs::SafeSerializer serializer;
+ serializer.add((u32)peer->address.address.sin_family);
+ serializer.add((u32)peer->address.address.sin_addr.s_addr);
+ serializer.add((u16)peer->address.address.sin_port);
+ auto newPeerMessage = std::make_shared<Message>(MessageType::SUBSCRIBE);
+ newPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size());
+ newPeerMessage->append(serializer.getBuffer().data(), serializer.getBuffer().size());
+ auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
+ {
+ Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str());
+ };
+ for(auto &existingPeer : peers)
+ {
+ connections.send(existingPeer, newPeerMessage, sendCallbackFunc);
+ }
+
+ sibs::SafeSerializer newPeerSerializer;
+ for(auto &existingPeer : peers)
+ {
+ newPeerSerializer.add((u32)existingPeer->address.address.sin_family);
+ newPeerSerializer.add((u32)existingPeer->address.address.sin_addr.s_addr);
+ newPeerSerializer.add((u16)existingPeer->address.address.sin_port);
+ }
+ peers.push_back(peer);
+
+ auto existingPeerMessage = std::make_shared<Message>(MessageType::SUBSCRIBE);
+ existingPeerMessage->append(pubsubKey.data.data(), pubsubKey.data.size());
+ existingPeerMessage->append(newPeerSerializer.getBuffer().data(), newPeerSerializer.getBuffer().size());
+ connections.send(peer, existingPeerMessage, sendCallbackFunc);
}
-
- sibs::SafeSerializer serializer;
- serializer.add(pubsubKey.data.data(), pubsubKey.data.size());
- serializer.add((u32)newPeer->address.address.sin_family);
- serializer.add((u32)newPeer->address.address.sin_addr.s_addr);
- serializer.add((u16)newPeer->address.address.sin_port);
- std::shared_ptr<std::vector<u8>> serializerData = std::make_shared<std::vector<u8>>(std::move(serializer.getBuffer()));
-
- auto sendCallbackFunc = [](PubSubResult result, const std::string &resultStr)
+ else if(messageType == MessageType::UNSUBSCRIBE)
{
- Log::debug("BootstrapNode::peerSubscribe send result: %d, result string: %s", result, resultStr.c_str());
- };
-
- sibs::SafeSerializer newPeerSerializer;
- newPeerSerializer.add(pubsubKey.data.data(), pubsubKey.data.size());
- for(auto &peer : peers)
+ Log::debug("BootstrapNode: Received peer unsubscribe from (ip: %s, port: %d)", peer->address.getAddress().c_str(), peer->address.getPort());
+ std::lock_guard<std::mutex> lock(subscribedPeersMutex);
+ auto &peers = subscribedPeers[pubsubKey];
+ for(auto it = peers.begin(); it != peers.end(); ++it)
+ {
+ auto existingPeer = *it;
+ if(existingPeer->address == peer->address)
+ {
+ peers.erase(it);
+ break;
+ }
+ }
+ }
+ else
{
- connections.send(peer, serializerData, sendCallbackFunc);
- newPeerSerializer.add((u32)peer->address.address.sin_family);
- newPeerSerializer.add((u32)peer->address.address.sin_addr.s_addr);
- newPeerSerializer.add((u16)peer->address.address.sin_port);
+ Log::warn("BootstrapNode: received message from client that was not subscribe or unsubscribe");
}
- peers.push_back(newPeer);
- connections.send(newPeer, std::make_shared<std::vector<u8>>(std::move(newPeerSerializer.getBuffer())), sendCallbackFunc);
}
}