aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-10-19 23:01:52 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-10-19 23:03:20 +0200
commit54254462e432dcc6ef2bb306a9ee773d21314d19 (patch)
tree3334799426ba0186829de1bbffe0500a64331c31
parent3565289c19974ca874f87429cc74a87558249c8e (diff)
Retry put for 30 sec to wait for peer connections
-rw-r--r--include/sibs/BootstrapConnection.hpp18
-rw-r--r--src/BootstrapConnection.cpp230
-rw-r--r--src/BootstrapNode.cpp4
-rw-r--r--src/DirectConnection.cpp45
-rw-r--r--src/Log.cpp12
-rw-r--r--tests/main.cpp8
6 files changed, 187 insertions, 130 deletions
diff --git a/include/sibs/BootstrapConnection.hpp b/include/sibs/BootstrapConnection.hpp
index 9e2dfff..8156bd7 100644
--- a/include/sibs/BootstrapConnection.hpp
+++ b/include/sibs/BootstrapConnection.hpp
@@ -4,6 +4,7 @@
#include "DirectConnection.hpp"
#include "PubsubKey.hpp"
#include <mutex>
+#include <atomic>
namespace sibs
{
@@ -23,6 +24,13 @@ namespace sibs
// Return false if you want to stop listening on the key
using BoostrapConnectionListenCallbackFunc = std::function<bool(const DirectConnectionPeer *peer, const void *data, const usize size)>;
+ struct SubscribeData
+ {
+ BoostrapConnectionListenCallbackFunc listenCallbackFunc = nullptr;
+ std::vector<std::shared_ptr<DirectConnectionPeer>> peers;
+ i64 listenStartTimeMs = 0;
+ };
+
struct ListenHandle
{
PubsubKey key;
@@ -34,6 +42,7 @@ namespace sibs
public:
// Throws BootstrapConnectionException on error
BootstrapConnection(const Ipv4 &bootstrapAddress);
+ ~BootstrapConnection();
// If we are already listening on the key @pubsubKey then the callback function is overwritten
ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc);
@@ -46,14 +55,15 @@ namespace sibs
std::vector<std::shared_ptr<DirectConnectionPeer>> getPeers();
private:
+ ListenHandle listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc);
void receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size);
void receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size);
private:
DirectConnections connections;
std::shared_ptr<DirectConnectionPeer> serverPeer;
- PubsubKeyMap<BoostrapConnectionListenCallbackFunc> listenCallbackFuncs;
- PubsubKeyMap<std::vector<std::shared_ptr<DirectConnectionPeer>>> subscribedPeers;
- std::recursive_mutex listenerCallbackFuncMutex;
- std::recursive_mutex subscribedPeersMutex;
+ PubsubKeyMap<SubscribeData> subscribeData;
+ std::recursive_mutex subscribeDataMutex;
+ bool alive;
+ std::atomic_int putThreadCount;
};
}
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 35bb11e..f83b21c 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -1,22 +1,31 @@
#include "../include/sibs/BootstrapConnection.hpp"
#include "../include/Log.hpp"
#include <sibs/SafeDeserializer.hpp>
+#include <chrono>
+#include <unordered_set>
+
+namespace chrono = std::chrono;
namespace sibs
{
- BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress)
+ BootstrapConnection::BootstrapConnection(const Ipv4 &bootstrapAddress) :
+ alive(true),
+ putThreadCount(0)
{
connections.onRemoveDisconnectedPeer([this](std::shared_ptr<DirectConnectionPeer> peer)
{
- std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
- for(auto &topicUsers : subscribedPeers)
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ for(auto &it : subscribeData)
{
- for(auto it = topicUsers.second.begin(); it != topicUsers.second.end(); )
+ for(auto subscribedPeersIt = it.second.peers.begin(), end = it.second.peers.end();
+ subscribedPeersIt != end;
+ ++subscribedPeersIt)
{
- if(peer->address == (*it)->address)
- it = topicUsers.second.erase(it);
- else
- ++it;
+ if(peer->address == (*subscribedPeersIt)->address)
+ {
+ it.second.peers.erase(subscribedPeersIt);
+ break;
+ }
}
}
});
@@ -44,6 +53,15 @@ namespace sibs
throw BootstrapConnectionException(errMsg);
}
}
+
+ BootstrapConnection::~BootstrapConnection()
+ {
+ alive = false;
+ while(putThreadCount > 0)
+ {
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+ }
// TODO: This is vulnerable against MitM attack, replace with asymmetric cryptography, get data signed with server private key and verify against known server public key
void BootstrapConnection::receiveDataFromServer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
@@ -60,12 +78,14 @@ namespace sibs
deserializer.extract(pubsubKey.data.data(), PUBSUB_KEY_LENGTH);
// we want lock to live this whole scope so we dont connect to peer when cancelListen is called
- std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
- auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
- if(listenerFuncIt == listenCallbackFuncs.end())
{
- Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
- return;
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto subscribeDataIt = subscribeData.find(pubsubKey);
+ if(subscribeDataIt == subscribeData.end())
+ {
+ Log::debug("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ return;
+ }
}
while(!deserializer.empty())
@@ -85,8 +105,14 @@ namespace sibs
{
if(result == PubSubResult::OK)
{
- std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
- subscribedPeers[pubsubKey].push_back(peer);
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto subscribeDataIt = subscribeData.find(pubsubKey);
+ if(subscribeDataIt == subscribeData.end())
+ {
+ Log::warn("BootstrapConnection::receiveDataFromServer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ return;
+ }
+ subscribeDataIt->second.peers.push_back(peer);
++peer->sharedKeys;
Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort());
}
@@ -111,16 +137,16 @@ namespace sibs
memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH);
if(messageType == MessageType::DATA)
{
- listenerCallbackFuncMutex.lock();
- auto listenerFuncIt = listenCallbackFuncs.find(pubsubKey);
- if(listenerFuncIt == listenCallbackFuncs.end())
+ subscribeDataMutex.lock();
+ auto listenerFuncIt = subscribeData.find(pubsubKey);
+ if(listenerFuncIt == subscribeData.end())
{
- listenerCallbackFuncMutex.unlock();
+ subscribeDataMutex.unlock();
Log::debug("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
return;
}
- auto listenCallbackFunc = listenerFuncIt->second;
- listenerCallbackFuncMutex.unlock();
+ auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc;
+ subscribeDataMutex.unlock();
if(listenCallbackFunc)
{
@@ -132,17 +158,17 @@ namespace sibs
else if(messageType == MessageType::UNSUBSCRIBE)
{
Log::debug("BootstrapConnection::receiveDataFromPeer: peer (ip: %s, port: %d) unsubscribed from key '%s'", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str());
- std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex);
- auto peersListIt = subscribedPeers.find(pubsubKey);
- if(peersListIt == subscribedPeers.end())
+ std::lock_guard<std::recursive_mutex> subscribersMutex(subscribeDataMutex);
+ auto peersListIt = subscribeData.find(pubsubKey);
+ if(peersListIt == subscribeData.end())
return;
- for(auto it = peersListIt->second.begin(); it != peersListIt->second.end(); ++it)
+ for(auto it = peersListIt->second.peers.begin(); it != peersListIt->second.peers.end(); ++it)
{
auto existingPeer = *it;
if(*existingPeer == *peer)
{
- peersListIt->second.erase(it);
+ peersListIt->second.peers.erase(it);
--peer->sharedKeys;
if(peer->sharedKeys <= 0)
connections.removePeer(peer->socket->udtSocket);
@@ -155,23 +181,28 @@ namespace sibs
Log::warn("BootstrapConnection::receiveDataFromPeer: received message from peer that was not data or unsubscribe");
}
}
-
- ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
+
+ ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc, bool registerCallbackFunc)
{
{
- std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
- auto it = listenCallbackFuncs.find(pubsubKey);
- if(it != listenCallbackFuncs.end())
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto it = subscribeData.find(pubsubKey);
+ if(it != subscribeData.end())
{
Log::warn("BootstrapConnection::listen called on existing listener, overwriting callback function");
- it->second = callbackFunc;
+ if(registerCallbackFunc)
+ it->second.listenCallbackFunc = callbackFunc;
return { pubsubKey };
}
- listenCallbackFuncs[pubsubKey] = callbackFunc;
+ auto &data = subscribeData[pubsubKey];
+ if(registerCallbackFunc)
+ data.listenCallbackFunc = callbackFunc;
+ data.listenStartTimeMs = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
}
auto message = std::make_shared<Message>(MessageType::SUBSCRIBE);
message->append(pubsubKey.data.data(), pubsubKey.data.size());
+ Log::debug("BootstrapConnection::listen: starting to listen to %s", pubsubKey.toString().c_str());
connections.send(serverPeer, message, [](PubSubResult result, const std::string &resultStr)
{
Log::debug("BootstrapConnection::listen: PubSubResult: %d, result string: %s", result, resultStr.c_str());
@@ -180,81 +211,126 @@ namespace sibs
return { pubsubKey };
}
+ ListenHandle BootstrapConnection::listen(const PubsubKey &pubsubKey, BoostrapConnectionListenCallbackFunc callbackFunc)
+ {
+ return listen(pubsubKey, callbackFunc, true);
+ }
+
bool BootstrapConnection::put(const PubsubKey &pubsubKey, const void *data, const usize size)
{
if(size > 819200) // 800kb
return false;
{
- std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
- auto listenCallbackFuncIt = listenCallbackFuncs.find(pubsubKey);
- if(listenCallbackFuncIt != listenCallbackFuncs.end() && listenCallbackFuncIt->second)
- listenCallbackFuncIt->second(nullptr, data, size);
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto listenCallbackFuncIt = subscribeData.find(pubsubKey);
+ if(listenCallbackFuncIt != subscribeData.end() && listenCallbackFuncIt->second.listenCallbackFunc)
+ listenCallbackFuncIt->second.listenCallbackFunc(nullptr, data, size);
- if(listenCallbackFuncIt == listenCallbackFuncs.end())
- Log::warn("BootstrapConnection::put on key '%s' which we are not listening to", pubsubKey.toString().c_str());
- }
-
- std::lock_guard<std::recursive_mutex> lock(subscribedPeersMutex);
- auto peersIt = subscribedPeers.find(pubsubKey);
- if(peersIt == subscribedPeers.end())
- {
- Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str());
- return true;
+ if(listenCallbackFuncIt == subscribeData.end())
+ {
+ Log::warn("BootstrapConnection::put on key '%s' which we are not listening to, automatically listening now", pubsubKey.toString().c_str());
+ listen(pubsubKey, nullptr, false);
+ }
}
auto message = std::make_shared<Message>(MessageType::DATA);
message->append(pubsubKey.data.data(), pubsubKey.data.size());
message->append(data, size);
- for(auto &peer : peersIt->second)
+ std::thread([this, pubsubKey, message]()
{
- connections.send(peer, message);
- }
+ ++putThreadCount;
+ std::unordered_set<int> peersMessaged;
+ std::vector<std::shared_ptr<DirectConnectionPeer>> peersToMessage;
+ i64 listenStartTimeMs = 0;
+ while(alive)
+ {
+ {
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto subscribeIt = subscribeData.find(pubsubKey);
+ if(subscribeIt == subscribeData.end())
+ {
+ Log::warn("BootstrapConnection::put with no subscribers on same key '%s'", pubsubKey.toString().c_str());
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ continue;
+ }
+
+ listenStartTimeMs = subscribeIt->second.listenStartTimeMs;
+
+ for(auto &peer : subscribeIt->second.peers)
+ {
+ if(peersMessaged.find(peer->socket->udtSocket) != peersMessaged.end())
+ continue;
+ peersToMessage.push_back(peer);
+ peersMessaged.insert(peer->socket->udtSocket);
+ }
+ }
+
+ if(!peersToMessage.empty())
+ {
+ for(auto &peer : peersToMessage)
+ {
+ connections.send(peer, message);
+ }
+ peersToMessage.clear();
+ }
+
+ const i64 half_min_ms = 1000 * 30;
+ i64 now = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now().time_since_epoch()).count();
+ if(now - listenStartTimeMs > half_min_ms)
+ break;
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(200));
+ }
+ --putThreadCount;
+ }).detach();
return true;
}
bool BootstrapConnection::cancelListen(const ListenHandle &listener)
{
+ std::vector<std::shared_ptr<DirectConnectionPeer>> peersToMessage;
{
- std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
- auto it = listenCallbackFuncs.find(listener.key);
- if(it == listenCallbackFuncs.end())
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto it = subscribeData.find(listener.key);
+ if(it == subscribeData.end())
return false;
- listenCallbackFuncs.erase(it);
- auto message = std::make_shared<Message>(MessageType::UNSUBSCRIBE);
- message->append(listener.key.data.data(), listener.key.data.size());
- connections.send(serverPeer, message);
+ peersToMessage = it->second.peers;
+ }
- std::lock_guard<std::recursive_mutex> subscribersMutex(subscribedPeersMutex);
- auto peersListIt = subscribedPeers.find(listener.key);
- // this will happen if there are no other peers subscribed to the key
- if(peersListIt == subscribedPeers.end())
- return true;
+ auto message = std::make_shared<Message>(MessageType::UNSUBSCRIBE);
+ message->append(listener.key.data.data(), listener.key.data.size());
+ connections.send(serverPeer, message);
- for(auto &peer : peersListIt->second)
+ for(auto &peer : peersToMessage)
+ {
+ --peer->sharedKeys;
+ if(peer->sharedKeys <= 0)
{
- --peer->sharedKeys;
- if(peer->sharedKeys <= 0)
- {
- // disconnect from peer
- connections.removePeer(peer->socket->udtSocket);
- }
- else
- {
- // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore
- connections.send(peer, message);
- }
+ // disconnect from peer
+ connections.removePeer(peer->socket->udtSocket);
}
- subscribedPeers.erase(peersListIt);
+ else
+ {
+ // unsubscribe from peers request, even if they dont accept it we wont listen to messages from the key anymore
+ connections.send(peer, message);
+ }
+ }
+
+ {
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ auto it = subscribeData.find(listener.key);
+ if(it != subscribeData.end())
+ subscribeData.erase(it);
}
return true;
}
bool BootstrapConnection::areWeListeningOnKey(const PubsubKey &pubsubKey)
{
- std::lock_guard<std::recursive_mutex> lock(listenerCallbackFuncMutex);
- return listenCallbackFuncs.find(pubsubKey) != listenCallbackFuncs.end();
+ std::lock_guard<std::recursive_mutex> lock(subscribeDataMutex);
+ return subscribeData.find(pubsubKey) != subscribeData.end();
}
std::vector<std::shared_ptr<DirectConnectionPeer>> BootstrapConnection::getPeers()
diff --git a/src/BootstrapNode.cpp b/src/BootstrapNode.cpp
index 81df6d7..f6fc0c3 100644
--- a/src/BootstrapNode.cpp
+++ b/src/BootstrapNode.cpp
@@ -17,7 +17,7 @@ namespace sibs
connections(address.getPort()),
socket(connections.createSocket(address, false, true))
{
- if(connections.port != address.getPort())
+ if(address.getPort() == 0 || connections.port != address.getPort())
{
throw SocketCreateException("BootstrapNode: Failed to bind port " + std::to_string(address.getPort()));
}
@@ -75,7 +75,7 @@ namespace sibs
char clientHost[NI_MAXHOST];
char clientService[NI_MAXSERV];
getnameinfo((sockaddr *)&clientAddr, addrLen, clientHost, sizeof(clientHost), clientService, sizeof(clientService), NI_NUMERICHOST | NI_NUMERICSERV);
- Log::debug("UDT: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket);
+ Log::debug("BootstrapNode::acceptConnections: New connection: %s:%s (socket: %d)", clientHost, clientService, clientUdtSocket);
std::lock_guard<std::mutex> lock(connections.peersMutex);
UDT::epoll_add_usock(connections.eid, clientUdtSocket);
diff --git a/src/DirectConnection.cpp b/src/DirectConnection.cpp
index 8034c50..010b8af 100644
--- a/src/DirectConnection.cpp
+++ b/src/DirectConnection.cpp
@@ -82,46 +82,19 @@ namespace sibs
if(rendezvous || bind)
{
- if(reuseAddr)
+ Ipv4 myAddr = addressToBind;
+ for(int i = 0; i < 2000; ++i)
{
- /*
- if(UDT::bind(socket, (sockaddr*)&addressToBind.address, sizeof(addressToBind.address)) == UDT::ERROR)
+ if(UDT::bind(udtSocket, (sockaddr*)&myAddr.address, sizeof(myAddr.address)) == UDT::ERROR)
{
- std::string errMsg = "UDT: Failed to bind, error: ";
- errMsg += UDT::getlasterror_desc();
- throw SocketCreateException(errMsg);
+ port = (u16)generateRandomNumber(2000, 32000);
+ Log::warn("DirectConnections::createSocket: failed to bind socket to port %d, trying port %d. Fail reason: %s", myAddr.getPort(), port, UDT::getlasterror_desc());
+ myAddr.address.sin_port = htons(port);
}
- */
- Ipv4 myAddr = addressToBind;
- for(int i = 0; i < 2000; ++i)
- {
- if(UDT::bind(udtSocket, (sockaddr*)&myAddr.address, sizeof(myAddr.address)) == UDT::ERROR)
- {
- u16 newPort = (u16)generateRandomNumber(2000, 32000);
- Log::warn("DirectConnections: failed to bind socket to port %d, trying port %d. Fail reason: %s", port, newPort, UDT::getlasterror_desc());
- port = newPort;
- myAddr.address.sin_port = htons(port);
- }
- else
- return socket;
- }
- throw SocketCreateException("UDT: Failed to bind after 2000 tries");
- }
- else
- {
- Ipv4 myAddr = addressToBind;
- for(int i = 0; i < 2000; ++i)
- {
- if(UDT::bind(udtSocket, (sockaddr*)&myAddr.address, sizeof(myAddr.address)) == UDT::ERROR)
- {
- port = (u16)generateRandomNumber(2000, 32000);
- myAddr.address.sin_port = htons(port);
- }
- else
- return socket;
- }
- throw SocketCreateException("UDT: Failed to bind after 2000 tries");
+ else
+ return socket;
}
+ throw SocketCreateException("UDT: Failed to bind after 2000 tries");
}
return socket;
diff --git a/src/Log.cpp b/src/Log.cpp
index df424f0..a7593ec 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -13,9 +13,9 @@ namespace sibs
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("\033[1;32mDebug:\033[0m ", stdout);
- vfprintf(stdout, fmt, args);
- fputs("\n", stdout);
+ fputs("\033[1;32mDebug:\033[0m ", stderr);
+ vfprintf(stderr, fmt, args);
+ fputs("\n", stderr);
va_end(args);
}
@@ -24,9 +24,9 @@ namespace sibs
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("\033[1;33mWarning:\033[0m ", stdout);
- vfprintf(stdout, fmt, args);
- fputs("\n", stdout);
+ fputs("\033[1;33mWarning:\033[0m ", stderr);
+ vfprintf(stderr, fmt, args);
+ fputs("\n", stderr);
va_end(args);
}
diff --git a/tests/main.cpp b/tests/main.cpp
index 2d1d505..d6c37e2 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -36,12 +36,10 @@ int main()
gotAsdf2 = true;
return true;
});
- // wait until connection1 and connection2 receive each other as peers from bootstrap node
- std::this_thread::sleep_for(std::chrono::seconds(3));
-
+
connection1.put(key, "hello", 5);
- connection1.put(key, "asdf", 4);
- std::this_thread::sleep_for(std::chrono::seconds(3));
+ connection2.put(key, "asdf", 4);
+ std::this_thread::sleep_for(std::chrono::seconds(6));
REQUIRE(gotData1);
REQUIRE(gotAsdf1);