diff options
-rw-r--r-- | include/odhtdb/Database.hpp | 11 | ||||
-rw-r--r-- | include/odhtdb/DhtKey.hpp | 5 | ||||
-rw-r--r-- | project.conf | 2 | ||||
-rw-r--r-- | src/Database.cpp | 50 | ||||
-rw-r--r-- | src/DhtKey.cpp | 9 | ||||
-rw-r--r-- | tests/main.cpp | 33 |
6 files changed, 102 insertions, 8 deletions
diff --git a/include/odhtdb/Database.hpp b/include/odhtdb/Database.hpp index a68c601..bd7b3f5 100644 --- a/include/odhtdb/Database.hpp +++ b/include/odhtdb/Database.hpp @@ -17,6 +17,7 @@ #include <vector> #include <ntp/NtpClient.hpp> #include <boost/filesystem/path.hpp> +#include <sibs/SafeSerializer.hpp> #include <stdexcept> #include <functional> @@ -146,6 +147,9 @@ namespace odhtdb using CreateNodeCallbackFunc = std::function<void(const DatabaseCreateNodeRequest&)>; using AddNodeCallbackFunc = std::function<void(const DatabaseAddNodeRequest&)>; using AddUserCallbackFunc = std::function<void(const DatabaseAddUserRequest&)>; + + using ReceiveCustomMessageCallbackFunc = std::function<sibs::SafeSerializer(const void *data, usize size)>; + using SendCustomMessageCallbackFunc = std::function<bool(bool gotResponse, const void *data, usize size)>; struct DatabaseCallbackFuncs { @@ -191,6 +195,13 @@ namespace odhtdb std::vector<NodeUserKeyPair> getStoredUserNodeDataDecrypted(const std::string &username, const std::string &password); std::vector<OwnedMemory> getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const; + + void receiveCustomMessage(const dht::InfoHash &requestKey, ReceiveCustomMessageCallbackFunc callbackFunc); + + // Return true in @callbackFunc if you want to continue listening for responses, otherwise return false + void sendCustomMessage(const dht::InfoHash &key, std::vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc); + + static dht::InfoHash getInfoHash(const void *data, usize size); private: void deserializeCreateRequest(const std::shared_ptr<dht::Value> &value, const Hash &hash, const std::shared_ptr<OwnedMemory> encryptionKey); void deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const std::shared_ptr<OwnedMemory> encryptionKey); diff --git a/include/odhtdb/DhtKey.hpp b/include/odhtdb/DhtKey.hpp index 7c30ee3..c3398bf 100644 --- a/include/odhtdb/DhtKey.hpp +++ b/include/odhtdb/DhtKey.hpp @@ -9,9 +9,10 @@ namespace odhtdb { public: DhtKey(const Hash &key); + DhtKey(const dht::InfoHash &infoHash); - const dht::InfoHash& getNewDataListenerKey(); - const dht::InfoHash& getRequestOldDataKey(); + dht::InfoHash getNewDataListenerKey(); + dht::InfoHash getRequestOldDataKey(); private: dht::InfoHash infoHash; unsigned char firstByteOriginalValue; diff --git a/project.conf b/project.conf index 6d45e26..b3e01cc 100644 --- a/project.conf +++ b/project.conf @@ -12,7 +12,7 @@ expose_include_dirs = ["include"] opendht = "1.7.0" libsodium = "1.0.16" ntpclient = "0.3.0" -sibs-serializer = "1.0.0" +sibs-serializer = "1.0.1" boost-filesystem = "1.66.0" boost-uuid = "1.66.0" argon2 = "2017.12.27" diff --git a/src/Database.cpp b/src/Database.cpp index 241d9a5..5b3f705 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -215,7 +215,7 @@ namespace odhtdb Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); - auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> &value) + auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> value) { Log::debug("Seed: New data listener received data..."); const Hash requestHash(value->data.data(), value->data.size()); @@ -233,7 +233,7 @@ namespace odhtdb newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. - auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> &value) + auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value) { const Hash requestHash(value->data.data(), value->data.size()); if(requestHash == *nodeToSeed.getRequestHash()) @@ -245,7 +245,7 @@ namespace odhtdb // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. - auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> &value) + auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value) { Log::debug("Request: Got request to send old data"); try @@ -670,4 +670,48 @@ namespace odhtdb { return databaseStorage.getUserGroups(nodeHash, userPublicKey); } + + void Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) + { + dht::InfoHash responseKey = receiveMessageKey; + ++responseKey[0]; + + node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr<Value> value) + { + sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size()); + if(!serializer.getBuffer().empty()) + { + Value responseValue(move(serializer.getBuffer())); + node.put(responseKey, move(responseValue), [](bool ok) + { + if(!ok) + Log::error("Failed to respond to custom message"); + }); + } + return true; + }); + } + + void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc) + { + dht::InfoHash responseKey = requestKey; + ++responseKey[0]; + + node.listen(responseKey, [callbackFunc](const shared_ptr<Value> value) + { + return callbackFunc(true, value->data.data(), value->data.size()); + }); + + Value value(move(data)); + node.put(requestKey, move(value), [](bool ok) + { + if(!ok) + Log::error("Failed to send custom message"); + }); + } + + dht::InfoHash Database::getInfoHash(const void *data, usize size) + { + return dht::InfoHash((const u8*)data, size); + } } diff --git a/src/DhtKey.cpp b/src/DhtKey.cpp index 7bd6acf..422b715 100644 --- a/src/DhtKey.cpp +++ b/src/DhtKey.cpp @@ -8,13 +8,18 @@ namespace odhtdb firstByteOriginalValue = infoHash[0]; } - const dht::InfoHash& DhtKey::getNewDataListenerKey() + DhtKey::DhtKey(const dht::InfoHash &_infoHash) : infoHash(_infoHash) + { + firstByteOriginalValue = infoHash[0]; + } + + dht::InfoHash DhtKey::getNewDataListenerKey() { infoHash[0] = firstByteOriginalValue; return infoHash; } - const dht::InfoHash& DhtKey::getRequestOldDataKey() + dht::InfoHash DhtKey::getRequestOldDataKey() { infoHash[0] = firstByteOriginalValue + 1; return infoHash; diff --git a/tests/main.cpp b/tests/main.cpp index dd56164..e819684 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -266,6 +266,39 @@ int main() assertEquals(1, createNodeCounter); assertEquals(2, addDataCounter); assertEquals(1, addUserCounter); + + dht::InfoHash customMessageKey = Database::getInfoHash("asdf", 4); + sibs::SafeSerializer messageToSendSerializer; + messageToSendSerializer.add((u32)10); + + u32 receivedNumber = 0; + database.receiveCustomMessage(customMessageKey, [&receivedNumber](const void *data, usize size) + { + sibs::SafeDeserializer deserializer((const u8*)data, size); + receivedNumber = deserializer.extract<u32>(); + sibs::SafeSerializer serializer; + serializer.add((u32)20); + return serializer; + }); + + u32 sendCustomMessageResponseNumber = 0; + database.sendCustomMessage(customMessageKey, move(messageToSendSerializer.getBuffer()), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size) + { + if(!gotResponse) + { + Log::error("Didn't get reponse!"); + return true; + } + + sibs::SafeDeserializer deserializer((const u8*)data, size); + sendCustomMessageResponseNumber = deserializer.extract<u32>(); + return false; + }); + + + this_thread::sleep_for(chrono::seconds(3)); + assertEquals((u32)10, receivedNumber); + assertEquals((u32)20, sendCustomMessageResponseNumber); } return 0; |