aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-05-16 07:25:22 +0200
committerdec05eba <dec05eba@protonmail.com>2020-08-18 23:25:46 +0200
commit92d6393a34dac4b3d623a5169e2b50a9518b4976 (patch)
treea318cc84e3837f61c03cd4a0533c3380bb5fdbde
parentc0574f16ca7960a3301230ba36ba96148818e229 (diff)
Add functions to send/receive custom messages
-rw-r--r--include/odhtdb/Database.hpp11
-rw-r--r--include/odhtdb/DhtKey.hpp5
-rw-r--r--project.conf2
-rw-r--r--src/Database.cpp50
-rw-r--r--src/DhtKey.cpp9
-rw-r--r--tests/main.cpp33
6 files changed, 102 insertions, 8 deletions
diff --git a/include/odhtdb/Database.hpp b/include/odhtdb/Database.hpp
index a68c601..bd7b3f5 100644
--- a/include/odhtdb/Database.hpp
+++ b/include/odhtdb/Database.hpp
@@ -17,6 +17,7 @@
#include <vector>
#include <ntp/NtpClient.hpp>
#include <boost/filesystem/path.hpp>
+#include <sibs/SafeSerializer.hpp>
#include <stdexcept>
#include <functional>
@@ -146,6 +147,9 @@ namespace odhtdb
using CreateNodeCallbackFunc = std::function<void(const DatabaseCreateNodeRequest&)>;
using AddNodeCallbackFunc = std::function<void(const DatabaseAddNodeRequest&)>;
using AddUserCallbackFunc = std::function<void(const DatabaseAddUserRequest&)>;
+
+ using ReceiveCustomMessageCallbackFunc = std::function<sibs::SafeSerializer(const void *data, usize size)>;
+ using SendCustomMessageCallbackFunc = std::function<bool(bool gotResponse, const void *data, usize size)>;
struct DatabaseCallbackFuncs
{
@@ -191,6 +195,13 @@ namespace odhtdb
std::vector<NodeUserKeyPair> getStoredUserNodeDataDecrypted(const std::string &username, const std::string &password);
std::vector<OwnedMemory> getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const;
+
+ void receiveCustomMessage(const dht::InfoHash &requestKey, ReceiveCustomMessageCallbackFunc callbackFunc);
+
+ // Return true in @callbackFunc if you want to continue listening for responses, otherwise return false
+ void sendCustomMessage(const dht::InfoHash &key, std::vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc);
+
+ static dht::InfoHash getInfoHash(const void *data, usize size);
private:
void deserializeCreateRequest(const std::shared_ptr<dht::Value> &value, const Hash &hash, const std::shared_ptr<OwnedMemory> encryptionKey);
void deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const std::shared_ptr<OwnedMemory> encryptionKey);
diff --git a/include/odhtdb/DhtKey.hpp b/include/odhtdb/DhtKey.hpp
index 7c30ee3..c3398bf 100644
--- a/include/odhtdb/DhtKey.hpp
+++ b/include/odhtdb/DhtKey.hpp
@@ -9,9 +9,10 @@ namespace odhtdb
{
public:
DhtKey(const Hash &key);
+ DhtKey(const dht::InfoHash &infoHash);
- const dht::InfoHash& getNewDataListenerKey();
- const dht::InfoHash& getRequestOldDataKey();
+ dht::InfoHash getNewDataListenerKey();
+ dht::InfoHash getRequestOldDataKey();
private:
dht::InfoHash infoHash;
unsigned char firstByteOriginalValue;
diff --git a/project.conf b/project.conf
index 6d45e26..b3e01cc 100644
--- a/project.conf
+++ b/project.conf
@@ -12,7 +12,7 @@ expose_include_dirs = ["include"]
opendht = "1.7.0"
libsodium = "1.0.16"
ntpclient = "0.3.0"
-sibs-serializer = "1.0.0"
+sibs-serializer = "1.0.1"
boost-filesystem = "1.66.0"
boost-uuid = "1.66.0"
argon2 = "2017.12.27"
diff --git a/src/Database.cpp b/src/Database.cpp
index 241d9a5..5b3f705 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -215,7 +215,7 @@ namespace odhtdb
Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str());
DhtKey dhtKey(*nodeToSeed.getRequestHash());
- auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> &value)
+ auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> value)
{
Log::debug("Seed: New data listener received data...");
const Hash requestHash(value->data.data(), value->data.size());
@@ -233,7 +233,7 @@ namespace odhtdb
newSeedInfo.reponseKeyInfoHash = responseKeyShared;
// TODO: If this response key is spammed, generate a new one.
- auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> &value)
+ auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value)
{
const Hash requestHash(value->data.data(), value->data.size());
if(requestHash == *nodeToSeed.getRequestHash())
@@ -245,7 +245,7 @@ namespace odhtdb
// TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
- auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> &value)
+ auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value)
{
Log::debug("Request: Got request to send old data");
try
@@ -670,4 +670,48 @@ namespace odhtdb
{
return databaseStorage.getUserGroups(nodeHash, userPublicKey);
}
+
+ void Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc)
+ {
+ dht::InfoHash responseKey = receiveMessageKey;
+ ++responseKey[0];
+
+ node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr<Value> value)
+ {
+ sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size());
+ if(!serializer.getBuffer().empty())
+ {
+ Value responseValue(move(serializer.getBuffer()));
+ node.put(responseKey, move(responseValue), [](bool ok)
+ {
+ if(!ok)
+ Log::error("Failed to respond to custom message");
+ });
+ }
+ return true;
+ });
+ }
+
+ void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc)
+ {
+ dht::InfoHash responseKey = requestKey;
+ ++responseKey[0];
+
+ node.listen(responseKey, [callbackFunc](const shared_ptr<Value> value)
+ {
+ return callbackFunc(true, value->data.data(), value->data.size());
+ });
+
+ Value value(move(data));
+ node.put(requestKey, move(value), [](bool ok)
+ {
+ if(!ok)
+ Log::error("Failed to send custom message");
+ });
+ }
+
+ dht::InfoHash Database::getInfoHash(const void *data, usize size)
+ {
+ return dht::InfoHash((const u8*)data, size);
+ }
}
diff --git a/src/DhtKey.cpp b/src/DhtKey.cpp
index 7bd6acf..422b715 100644
--- a/src/DhtKey.cpp
+++ b/src/DhtKey.cpp
@@ -8,13 +8,18 @@ namespace odhtdb
firstByteOriginalValue = infoHash[0];
}
- const dht::InfoHash& DhtKey::getNewDataListenerKey()
+ DhtKey::DhtKey(const dht::InfoHash &_infoHash) : infoHash(_infoHash)
+ {
+ firstByteOriginalValue = infoHash[0];
+ }
+
+ dht::InfoHash DhtKey::getNewDataListenerKey()
{
infoHash[0] = firstByteOriginalValue;
return infoHash;
}
- const dht::InfoHash& DhtKey::getRequestOldDataKey()
+ dht::InfoHash DhtKey::getRequestOldDataKey()
{
infoHash[0] = firstByteOriginalValue + 1;
return infoHash;
diff --git a/tests/main.cpp b/tests/main.cpp
index dd56164..e819684 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -266,6 +266,39 @@ int main()
assertEquals(1, createNodeCounter);
assertEquals(2, addDataCounter);
assertEquals(1, addUserCounter);
+
+ dht::InfoHash customMessageKey = Database::getInfoHash("asdf", 4);
+ sibs::SafeSerializer messageToSendSerializer;
+ messageToSendSerializer.add((u32)10);
+
+ u32 receivedNumber = 0;
+ database.receiveCustomMessage(customMessageKey, [&receivedNumber](const void *data, usize size)
+ {
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ receivedNumber = deserializer.extract<u32>();
+ sibs::SafeSerializer serializer;
+ serializer.add((u32)20);
+ return serializer;
+ });
+
+ u32 sendCustomMessageResponseNumber = 0;
+ database.sendCustomMessage(customMessageKey, move(messageToSendSerializer.getBuffer()), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size)
+ {
+ if(!gotResponse)
+ {
+ Log::error("Didn't get reponse!");
+ return true;
+ }
+
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ sendCustomMessageResponseNumber = deserializer.extract<u32>();
+ return false;
+ });
+
+
+ this_thread::sleep_for(chrono::seconds(3));
+ assertEquals((u32)10, receivedNumber);
+ assertEquals((u32)20, sendCustomMessageResponseNumber);
}
return 0;