aboutsummaryrefslogtreecommitdiff
path: root/src/BootstrapConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/BootstrapConnection.cpp')
-rw-r--r--src/BootstrapConnection.cpp66
1 files changed, 51 insertions, 15 deletions
diff --git a/src/BootstrapConnection.cpp b/src/BootstrapConnection.cpp
index 4ace97c..d914646 100644
--- a/src/BootstrapConnection.cpp
+++ b/src/BootstrapConnection.cpp
@@ -114,11 +114,7 @@ namespace sibs
{
in_addr_t ipv4Address = deserializer.extract<u32>();
u16 port = deserializer.extract<u16>();
- Ipv4 newPeerAddress;
- newPeerAddress.address.sin_family = addressFamily;
- newPeerAddress.address.sin_addr.s_addr = ipv4Address;
- newPeerAddress.address.sin_port = port;
- memset(newPeerAddress.address.sin_zero, 0, sizeof(newPeerAddress.address.sin_zero));
+ Ipv4 newPeerAddress(addressFamily, ipv4Address, port);
Log::debug("BootstrapConnection::receiveDataFromServer: received subscriber (ip: %s, port: %d) from bootstrap node", newPeerAddress.getAddress().c_str(), newPeerAddress.getPort());
connections.connect(newPeerAddress, [this, pubsubKey](std::shared_ptr<DirectConnectionPeer> peer, PubSubResult result, const std::string &resultStr)
{
@@ -132,7 +128,6 @@ namespace sibs
return;
}
subscribeDataIt->second.peers.push_back(peer);
- ++peer->sharedKeys;
Log::debug("BootstrapConnection::receiveDataFromServer: Connected to peer (ip: %s, port: %d) given by bootstrap node", peer->address.getAddress().c_str(), peer->address.getPort());
}
else
@@ -149,11 +144,18 @@ namespace sibs
void BootstrapConnection::receiveDataFromPeer(std::shared_ptr<DirectConnectionPeer> peer, MessageType messageType, const void *data, const usize size)
{
- if(size < PUBSUB_KEY_LENGTH)
- return;
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ Ipv4 targetAddress;
+ if(peer == serverPeer)
+ {
+ u16 addressType = deserializer.extract<u16>();
+ u32 address = deserializer.extract<u32>();
+ u16 port = deserializer.extract<u16>();
+ targetAddress = Ipv4(addressType, address, port);
+ }
PubsubKey pubsubKey;
- memcpy(pubsubKey.data.data(), data, PUBSUB_KEY_LENGTH);
+ deserializer.extract(pubsubKey.data.data(), pubsubKey.data.size());
if(messageType == MessageType::DATA)
{
subscribeDataMutex.lock();
@@ -165,11 +167,30 @@ namespace sibs
return;
}
auto listenCallbackFunc = listenerFuncIt->second.listenCallbackFunc;
+ if(peer == serverPeer)
+ {
+ for(auto pubsubPeer : listenerFuncIt->second.peers)
+ {
+ if(pubsubPeer->address == targetAddress)
+ {
+ peer = pubsubPeer;
+ Log::debug("BootstrapConnection::receiveDataFromPeer: Received routed data from server, originator peer: ip: %s, port: %d", peer->address.getAddress().c_str(), peer->address.getPort(), pubsubKey.toString().c_str());
+ break;
+ }
+ }
+
+ if(peer == serverPeer)
+ {
+ Log::warn("BootstrapConnection::receiveDataFromPeer: No listener found for key '%s', ignoring...", pubsubKey.toString().c_str());
+ subscribeDataMutex.unlock();
+ return;
+ }
+ }
subscribeDataMutex.unlock();
if(listenCallbackFunc)
{
- bool continueListening = listenCallbackFunc(peer.get(), (const u8*)data + PUBSUB_KEY_LENGTH, size - PUBSUB_KEY_LENGTH);
+ bool continueListening = listenCallbackFunc(peer.get(), deserializer.getBuffer(), deserializer.getSize());
if(!continueListening)
cancelListen({ pubsubKey });
}
@@ -257,11 +278,12 @@ namespace sibs
listen(pubsubKey, nullptr, false);
}
}
-
+
+ std::vector<unsigned char> containedData((unsigned char*)data, (unsigned char*)data + size);
auto message = std::make_shared<Message>(MessageType::DATA);
message->append(pubsubKey.data.data(), pubsubKey.data.size());
- message->append(data, size);
- std::thread([this, pubsubKey, message]()
+ message->append(containedData.data(), containedData.size());
+ std::thread([this, pubsubKey, message](std::vector<unsigned char> containedData)
{
++putThreadCount;
std::unordered_set<int> peersMessaged;
@@ -294,7 +316,20 @@ namespace sibs
{
for(auto &peer : peersToMessage)
{
- connections.send(peer, message);
+ if(peer->routed)
+ {
+ auto routedMessage = std::make_shared<Message>(MessageType::DATA);
+ routedMessage->serializer.add((u16)peer->address.address.sin_family);
+ routedMessage->serializer.add((u32)peer->address.address.sin_addr.s_addr);
+ routedMessage->serializer.add((u16)peer->address.address.sin_port);
+ routedMessage->append(pubsubKey.data.data(), pubsubKey.data.size());
+ routedMessage->append(containedData.data(), containedData.size());
+ connections.send(serverPeer, routedMessage);
+ }
+ else
+ {
+ connections.send(peer, message);
+ }
}
peersToMessage.clear();
}
@@ -307,7 +342,7 @@ namespace sibs
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
--putThreadCount;
- }).detach();
+ }, std::move(containedData)).detach();
return true;
}
@@ -327,6 +362,7 @@ namespace sibs
message->append(listener.key.data.data(), listener.key.data.size());
connections.send(serverPeer, message);
+ // TODO: Unsubscribe to peers we are connected to indirectly (routed data)
for(auto &peer : peersToMessage)
{
--peer->sharedKeys;