diff options
author | dec05eba <0xdec05eba@gmail.com> | 2018-10-16 00:38:01 +0200 |
---|---|---|
committer | dec05eba <0xdec05eba@gmail.com> | 2018-10-16 00:39:21 +0200 |
commit | 13f2007d104149f69ab7a794d2e119830e638eaa (patch) | |
tree | 8dfffb5669d9db6f2328426f5a1cccb72275d92e | |
parent | 911e62afb82b140e368181f4966442cd5c2e1bd8 (diff) |
Replace opendht with sibs pubsub
This should fix issues with memory usage/leaks and make it easier
to get peers subscribed to the same key.
It will also be easier to modify and also works easier cross platform
because of no additional dependencies.
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | .gitmodules | 3 | ||||
-rw-r--r-- | .vscode/c_cpp_properties.json | 85 | ||||
-rw-r--r-- | .vscode/launch.json | 27 | ||||
-rw-r--r-- | .vscode/settings.json | 59 | ||||
-rw-r--r-- | README.md | 4 | ||||
-rw-r--r-- | compile_commands.json | 2 | ||||
m--------- | depends/sibs-pubsub | 0 | ||||
-rw-r--r-- | include/odhtdb/Database.hpp | 54 | ||||
-rw-r--r-- | include/odhtdb/DatabaseStorage.hpp | 12 | ||||
-rw-r--r-- | include/odhtdb/DhtKey.hpp | 12 | ||||
-rw-r--r-- | include/odhtdb/InfoHash.hpp | 39 | ||||
-rw-r--r-- | include/odhtdb/Key.hpp | 15 | ||||
-rw-r--r-- | project.conf | 8 | ||||
-rw-r--r-- | src/Database.cpp | 326 | ||||
-rw-r--r-- | src/DatabaseStorage.cpp | 69 | ||||
-rw-r--r-- | src/DhtKey.cpp | 8 | ||||
-rw-r--r-- | src/InfoHash.cpp | 37 | ||||
-rw-r--r-- | tests/main.cpp | 73 |
19 files changed, 281 insertions, 555 deletions
@@ -4,3 +4,6 @@ odhtdb.kdev4 storage/ .gdb_history compile_commands.json +.vscode/ +tests/sibs-build/ +tests/compile_commands.json diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..fc65db9 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "depends/sibs-pubsub"] + path = depends/sibs-pubsub + url = https://gitlab.com/DEC05EBA/sibs-pubsub.git diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json deleted file mode 100644 index 73ef30c..0000000 --- a/.vscode/c_cpp_properties.json +++ /dev/null @@ -1,85 +0,0 @@ -{ - "configurations": [ - { - "name": "Mac", - "includePath": [ - "/usr/include", - "/usr/local/include", - "${workspaceRoot}" - ], - "defines": [], - "intelliSenseMode": "clang-x64", - "browse": { - "path": [ - "/usr/include", - "/usr/local/include", - "${workspaceRoot}" - ], - "limitSymbolsToIncludedHeaders": true, - "databaseFilename": "" - }, - "macFrameworkPath": [ - "/System/Library/Frameworks", - "/Library/Frameworks" - ] - }, - { - "name": "Linux", - "includePath": [ - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/../../../../include/c++/7.2.1", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/../../../../include/c++/7.2.1/x86_64-pc-linux-gnu", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/../../../../include/c++/7.2.1/backward", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/include", - "/usr/local/include", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/include-fixed", - "/usr/include", - "~/.sibs/lib/fmt/4.1.0", - "~/.sibs/lib/ntpclient/0.1.0/include", - "~/.sibs/lib/sibs-serializer/0.1.0/", - "${workspaceRoot}" - ], - "defines": [], - "intelliSenseMode": "clang-x64", - "browse": { - "path": [ - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/../../../../include/c++/7.2.1", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/../../../../include/c++/7.2.1/x86_64-pc-linux-gnu", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/../../../../include/c++/7.2.1/backward", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/include", - "/usr/local/include", - "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/include-fixed", - "/usr/include", - "~/.sibs/lib/fmt/4.1.0", - "~/.sibs/lib/ntpclient/0.1.0/include", - "~/.sibs/lib/sibs-serializer/0.1.0/", - "${workspaceRoot}" - ], - "limitSymbolsToIncludedHeaders": true, - "databaseFilename": "" - }, - "cStandard": "c11", - "cppStandard": "c++17" - }, - { - "name": "Win32", - "includePath": [ - "C:/Program Files (x86)/Microsoft Visual Studio 14.0/VC/include", - "${workspaceRoot}" - ], - "defines": [ - "_DEBUG", - "UNICODE" - ], - "intelliSenseMode": "msvc-x64", - "browse": { - "path": [ - "C:/Program Files (x86)/Microsoft Visual Studio 14.0/VC/include/*", - "${workspaceRoot}" - ], - "limitSymbolsToIncludedHeaders": true, - "databaseFilename": "" - } - } - ], - "version": 4 -}
\ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index c0c2715..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "(gdb) Launch", - "type": "cppdbg", - "request": "launch", - "program": "${workspaceFolder}/tests/sibs-build/debug/test", - "args": [], - "stopAtEntry": false, - "cwd": "${workspaceFolder}", - "environment": [], - "externalConsole": true, - "MIMode": "gdb", - "setupCommands": [ - { - "description": "Enable pretty-printing for gdb", - "text": "-enable-pretty-printing", - "ignoreFailures": true - } - ] - } - ] -}
\ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index aeb76fb..f5d622f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,60 +1,3 @@ { - "files.associations": { - "*.ipp": "cpp", - "typeinfo": "cpp", - "cctype": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "csignal": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "array": "cpp", - "atomic": "cpp", - "strstream": "cpp", - "*.tcc": "cpp", - "bitset": "cpp", - "chrono": "cpp", - "cinttypes": "cpp", - "codecvt": "cpp", - "complex": "cpp", - "condition_variable": "cpp", - "cstdint": "cpp", - "exception": "cpp", - "fstream": "cpp", - "functional": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "limits": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "numeric": "cpp", - "optional": "cpp", - "ostream": "cpp", - "ratio": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "streambuf": "cpp", - "string_view": "cpp", - "system_error": "cpp", - "thread": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "utility": "cpp", - "valarray": "cpp", - "__config": "cpp", - "__nullptr": "cpp", - "hash_map": "cpp", - "hash_set": "cpp" - } + "cquery.cacheDirectory": "${workspaceFolder}/.vscode/cquery_cached_index/" }
\ No newline at end of file @@ -5,11 +5,11 @@ Data is signed using ed25519, encrypted using xchacha20-poly1305 ietf and hashed See src/Encryption.cpp, src/Signature.cpp and src/Hash.cpp. Also check Scheme.md for packet construction. # Limits -Only 65kb of data can be used for each `add`. You can add more data by using `add` several times. +Only 800kb of data can be used for each `add`. You can add more data by using `add` several times. # TODO ## Data limit -Allow more than 65kb of data to be added at once +Allow more than 800kb of data to be added at once ## Node banning Ban nodes that spam put or get (malicious nodes). If data is routed, then the router node should first ban the malicious node so the router node is not banned if it's not malicious. But how can we know if data was routed? does opendht expose this to other nodes in some way? diff --git a/compile_commands.json b/compile_commands.json deleted file mode 100644 index 0d4f101..0000000 --- a/compile_commands.json +++ /dev/null @@ -1,2 +0,0 @@ -[ -] diff --git a/depends/sibs-pubsub b/depends/sibs-pubsub new file mode 160000 +Subproject eda94456add9a65d1821302e343bef4021d2a77 diff --git a/include/odhtdb/Database.hpp b/include/odhtdb/Database.hpp index 372b19c..a326398 100644 --- a/include/odhtdb/Database.hpp +++ b/include/odhtdb/Database.hpp @@ -1,7 +1,7 @@ #pragma once +#include "InfoHash.hpp" #include "types.hpp" -#include "Key.hpp" #include "DataView.hpp" #include "DatabaseStorage.hpp" #include "Hash.hpp" @@ -13,13 +13,13 @@ #include "OwnedMemory.hpp" #include "DatabaseOperation.hpp" #include "DatabaseOrder.hpp" -#include <opendht/dhtrunner.h> #include <vector> #include <ntp/NtpClient.hpp> #include <boost/filesystem/path.hpp> #include <sibs/SafeSerializer.hpp> #include <stdexcept> #include <functional> +#include <sibs/BootstrapConnection.hpp> namespace odhtdb { @@ -134,28 +134,11 @@ namespace odhtdb struct DatabaseSeedInfo { - std::shared_ptr<std::future<size_t>> newDataListenerFuture; - std::shared_ptr<std::future<size_t>> responseKeyFuture; - std::shared_ptr<std::future<size_t>> requestOldDataListenerFuture; + sibs::ListenHandle newDataListenHandle; + sibs::ListenHandle responseKeyListenHandle; + sibs::ListenHandle requestOldDataListenHandle; - std::shared_ptr<dht::InfoHash> reponseKeyInfoHash; - - DatabaseSeedInfo(){} - DatabaseSeedInfo(const DatabaseSeedInfo &other) - { - newDataListenerFuture = other.newDataListenerFuture; - responseKeyFuture = other.responseKeyFuture; - requestOldDataListenerFuture = other.requestOldDataListenerFuture; - reponseKeyInfoHash = other.reponseKeyInfoHash; - } - DatabaseSeedInfo& operator=(const DatabaseSeedInfo &other) - { - newDataListenerFuture = other.newDataListenerFuture; - responseKeyFuture = other.responseKeyFuture; - requestOldDataListenerFuture = other.requestOldDataListenerFuture; - reponseKeyInfoHash = other.reponseKeyInfoHash; - return *this; - } + std::shared_ptr<InfoHash> reponseKeyInfoHash; }; using CreateNodeCallbackFunc = std::function<void(const DatabaseCreateNodeRequest&)>; @@ -165,6 +148,8 @@ namespace odhtdb using ReceiveCustomMessageCallbackFunc = std::function<sibs::SafeSerializer(const void *data, usize size)>; using SendCustomMessageCallbackFunc = std::function<bool(bool gotResponse, const void *data, usize size)>; + using Value = std::vector<u8>; + struct DatabaseCallbackFuncs { CreateNodeCallbackFunc createNodeCallbackFunc; @@ -185,6 +170,7 @@ namespace odhtdb void stopSeeding(const Hash &nodeHash); void loadNode(const Hash &nodeHash, DatabaseLoadOrder loadOrder = DatabaseLoadOrder::OLDEST_FIRST); + // When a node is created, you automatically seed it as well so there is no need to call seed on it. // Throws DatabaseCreateException on failure. std::unique_ptr<DatabaseCreateResponse> create(); // Throws PermissionDeniedException if user @userToPerformActionWith is not allowed to add data to node @@ -213,26 +199,26 @@ namespace odhtdb // Returns -1 on failure int getUserLowestPermissionLevel(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const; - std::future<size_t> receiveCustomMessage(const dht::InfoHash &requestKey, ReceiveCustomMessageCallbackFunc callbackFunc); + sibs::ListenHandle receiveCustomMessage(const InfoHash &requestKey, ReceiveCustomMessageCallbackFunc callbackFunc); - void sendCustomMessage(const dht::InfoHash &key, std::vector<u8> &&data); + void sendCustomMessage(const InfoHash &key, const void *data, const usize size); // Return true in @callbackFunc if you want to continue listening for responses, otherwise return false - std::future<size_t> sendCustomMessage(const dht::InfoHash &key, std::vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc); + sibs::ListenHandle sendCustomMessage(const InfoHash &key, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc); - void cancelNodeListener(const dht::InfoHash &infoHash, std::future<size_t> &nodeListener); + void cancelNodeListener(sibs::ListenHandle &nodeListener); int clearCache(); - static dht::InfoHash getInfoHash(const void *data, usize size); + static InfoHash getInfoHash(const void *data, usize size); private: - void sendOldDataToPeer(const DatabaseNode nodeToSeed, const std::shared_ptr<dht::InfoHash> requestResponseInfoHash, const std::shared_ptr<dht::Value> value, usize valueOffset); - void deserializeCreateRequest(const std::shared_ptr<dht::Value> &value, const Hash &hash, const std::shared_ptr<OwnedByteArray> encryptionKey); - void deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const std::shared_ptr<OwnedByteArray> encryptionKey); - bool listenCreateData(std::shared_ptr<dht::Value> value, const Hash &hash, const std::shared_ptr<OwnedByteArray> encryptionKey); - bool listenAddData(std::shared_ptr<dht::Value> value, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const std::shared_ptr<OwnedByteArray> encryptionKey); + bool sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size); + void deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const std::shared_ptr<OwnedByteArray> encryptionKey); + void deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const std::shared_ptr<OwnedByteArray> encryptionKey); + bool listenCreateData(const void *data, const usize size, const Hash &hash, const std::shared_ptr<OwnedByteArray> encryptionKey); + bool listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const std::shared_ptr<OwnedByteArray> encryptionKey); private: - dht::DhtRunner node; + sibs::BootstrapConnection bootstrapConnection; DatabaseStorage databaseStorage; std::function<void(const DatabaseCreateNodeRequest&)> onCreateNodeCallbackFunc; std::function<void(const DatabaseAddNodeRequest&)> onAddNodeCallbackFunc; diff --git a/include/odhtdb/DatabaseStorage.hpp b/include/odhtdb/DatabaseStorage.hpp index b84635c..b0c081e 100644 --- a/include/odhtdb/DatabaseStorage.hpp +++ b/include/odhtdb/DatabaseStorage.hpp @@ -17,9 +17,8 @@ #include <stdexcept> #include <boost/filesystem/path.hpp> #include <sibs/SafeDeserializer.hpp> -#include <opendht/crypto.h> -#include <opendht/dhtrunner.h> #include <functional> +#include <sibs/DirectConnection.hpp> class sqlite3; class sqlite3_stmt; @@ -141,13 +140,11 @@ namespace odhtdb std::pair<bool, std::shared_ptr<OwnedByteArray>> getNodeDecryptionKey(const Hash &nodeHash); void setNodeDecryptionKey(const Hash &nodeHash, const DataView &decryptionKey); - const std::vector<dht::NodeExport>& getRemoteNodes() const; - void setRemoteNodes(const std::vector<dht::NodeExport> &remoteNodes); + const std::vector<std::shared_ptr<sibs::DirectConnectionPeer>>& getRemotePeers() const; + void setRemotePeers(const std::vector<std::shared_ptr<sibs::DirectConnectionPeer>> &remoteNodes); std::vector<OwnedByteArray> getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const; - const dht::crypto::Identity& getIdentity() const; - // Update storage state (remove quarantine objects if they are too old, etc) void update(); @@ -203,7 +200,6 @@ namespace odhtdb boost::filesystem::path remoteNodesFilePath; u8 passwordSalt[PASSWORD_SALT_LEN]; - std::pair<std::shared_ptr<dht::crypto::PrivateKey>, std::shared_ptr<dht::crypto::Certificate>> identity; - std::vector<dht::NodeExport> remoteNodes; + std::vector<std::shared_ptr<sibs::DirectConnectionPeer>> remotePeers; }; } diff --git a/include/odhtdb/DhtKey.hpp b/include/odhtdb/DhtKey.hpp index 959bb65..8af2bd4 100644 --- a/include/odhtdb/DhtKey.hpp +++ b/include/odhtdb/DhtKey.hpp @@ -1,7 +1,7 @@ #pragma once #include "Hash.hpp" -#include <opendht/infohash.h> +#include "InfoHash.hpp" namespace odhtdb { @@ -9,13 +9,13 @@ namespace odhtdb { public: DhtKey(const Hash &key); - DhtKey(const dht::InfoHash &infoHash); + DhtKey(const InfoHash &infoHash); - dht::InfoHash getNewDataListenerKey(); - dht::InfoHash getRequestOldDataKey(); - dht::InfoHash getPingKey(); + InfoHash getNewDataListenerKey(); + InfoHash getRequestOldDataKey(); + InfoHash getPingKey(); private: - dht::InfoHash infoHash; + InfoHash infoHash; unsigned char firstByteOriginalValue; }; } diff --git a/include/odhtdb/InfoHash.hpp b/include/odhtdb/InfoHash.hpp new file mode 100644 index 0000000..aba0a24 --- /dev/null +++ b/include/odhtdb/InfoHash.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include <array> +#include <stdexcept> +#include <sibs/PubsubKey.hpp> +#include <cassert> +#include "types.hpp" + +namespace odhtdb +{ + class InfoHashException : public std::runtime_error + { + public: + InfoHashException(const std::string &errMsg) : std::runtime_error(errMsg) {} + }; + + class InfoHash + { + public: + InfoHash(); + InfoHash(const u8 *data, const size_t size); + + // Throws InfoHashException on error + static InfoHash generateHash(const u8 *data, const size_t size); + + u8& operator [] (size_t index) + { + assert(index < key.data.size()); + return key.data[index]; + } + + const sibs::PubsubKey& getKey() const { return key; } + + bool operator == (const InfoHash &other) const; + bool operator != (const InfoHash &other) const; + private: + sibs::PubsubKey key; + }; +}
\ No newline at end of file diff --git a/include/odhtdb/Key.hpp b/include/odhtdb/Key.hpp deleted file mode 100644 index 18971d1..0000000 --- a/include/odhtdb/Key.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once - -#include <opendht/infohash.h> - -namespace odhtdb -{ - class Key - { - public: - Key() {} - Key(const char *key) : hashedKey(dht::InfoHash::get(key)) {} - - dht::InfoHash hashedKey; - }; -} diff --git a/project.conf b/project.conf index f34affa..29c70e1 100644 --- a/project.conf +++ b/project.conf @@ -1,9 +1,8 @@ [package] name = "odhtdb" -version = "0.0.2" +version = "0.1.0" type = "library" -platforms = ["linux32", "linux64"] -tests = "tests" +platforms = ["linux", "win", "macos", "bsd"] [config] expose_include_dirs = ["include"] @@ -15,10 +14,9 @@ expose_include_dirs = ["include"] #lib = "mingw/x64/static/release" [dependencies] -opendht = "1.7.4" libsodium = "1.0.16" ntpclient = "0.3.0" -sibs-serializer = "1.0.1" +sibs-serializer = "2.0.0" boost-filesystem = "1.66.0" boost-uuid = "1.66.0" argon2 = "2017.12.27" diff --git a/src/Database.cpp b/src/Database.cpp index 564b3d6..2a7a510 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -5,7 +5,6 @@ #include "../include/odhtdb/bin2hex.hpp" #include "../include/odhtdb/Log.hpp" #include <boost/uuid/uuid_generators.hpp> -#include <opendht.h> #include <sodium/randombytes.h> #include <thread> #include <chrono> @@ -14,7 +13,6 @@ #include <cassert> #include <sys/time.h> -using namespace dht; using namespace std; using namespace chrono_literals; @@ -25,8 +23,6 @@ static odhtdb::u64 timeOffsetFraction = 0; static thread *ntpThread = nullptr; static bool timestampSynced = false; -const int OPENDHT_INFOHASH_LEN = 20; - namespace odhtdb { static boost::uuids::random_generator uuidGen; @@ -34,24 +30,6 @@ namespace odhtdb const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; - const int NODE_PUT_RETRY_TIMES = 3; - - static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value, int retryCounter = 0) - { - node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok) - { - if(!ok) - { - if(retryCounter < NODE_PUT_RETRY_TIMES) - { - Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES); - nodePutWithRetry(node, infoHash, value, retryCounter + 1); - } - else - Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES); - } - }); - } class RequestQuarantineException : public runtime_error { @@ -108,34 +86,13 @@ namespace odhtdb } Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : + bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)), databaseStorage(this, storageDir), onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc), onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc), onAddUserCallbackFunc(callbackFuncs.addUserCallbackFunc), shuttingDown(false) { - node.run(port , { - /*.dht_config = */{ - /*.node_config = */{ - /*.node_id = */{}, - /*.network = */0, - /*.is_bootstrap = */false, - /*.maintain_storage*/false - }, - /*.id = */databaseStorage.getIdentity() - }, - /*.threaded = */true, - /*.proxy_server = */"", - /*.push_node_id = */"" - }); - node.setStorageLimit(1024 * 1024 * 1); // 1 Megabyte - auto portStr = to_string(port); - node.bootstrap(bootstrapNodeAddr, portStr.c_str()); - const auto &remoteNodes = databaseStorage.getRemoteNodes(); - if(!remoteNodes.empty()) - node.bootstrap(remoteNodes); - Log::debug("Connecting to bootstrap node (%s) and %u other known nodes that we have connected to previously with port %d", bootstrapNodeAddr, remoteNodes.size(), port); - // TODO: Make this work for multiple threads initializing database at same time ++databaseCount; if(databaseCount == 1) @@ -207,7 +164,8 @@ namespace odhtdb if(shuttingDown) return; } - databaseStorage.setRemoteNodes(node.exportNodes()); + + databaseStorage.setRemotePeers(bootstrapConnection.getPeers()); saveIntervalMs = 30000; // 30 sec } }); @@ -219,7 +177,6 @@ namespace odhtdb --databaseCount; shuttingDown = true; remoteNodesSaveThread.join(); - node.join(); } struct ActionGap @@ -228,78 +185,80 @@ namespace odhtdb u64 range; }; - void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr<InfoHash> requestResponseInfoHash, const shared_ptr<Value> value, usize valueOffset) + bool Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size) { Log::debug("Request: Got request to send old data"); - try + + sibs::SafeDeserializer deserializer((const u8*)data, size); + u16 requestStructureVersion = deserializer.extract<u16>(); + if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) { - sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset); - bool userWantsCreateNode = deserializer.extract<u8>() == 1; - DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>(); - - if(userWantsCreateNode) + Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); + return true; + } + shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>(); + deserializer.extract(&(*requestResponseInfoHash)[0], sibs::PUBSUB_KEY_LENGTH); + + bool userWantsCreateNode = deserializer.extract<u8>() == 1; + DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>(); + + if(userWantsCreateNode) + { + Log::debug("Request: Peer wants CreateNode"); + databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { - Log::debug("Request: Peer wants CreateNode"); - databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) - { - Log::debug("Request: Sent create packet to requesting peer"); - shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, *requestResponseInfoHash, value); - }); - } - - vector<unique_ptr<u8[]>> userPublicKeys; + Log::debug("Request: Sent create packet to requesting peer"); + bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + }); + } + + vector<unique_ptr<u8[]>> userPublicKeys; + + // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants + DataViewMap<vector<ActionGap>> actionGaps; + while(!deserializer.empty()) + { + unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); + deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); + u64 actionGapStart = deserializer.extract<u64>(); + u64 actionGapRange = deserializer.extract<u64>(); - // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants - DataViewMap<vector<ActionGap>> actionGaps; - while(!deserializer.empty()) + DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); + actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); + userPublicKeys.emplace_back(move(userPublicKeyRaw)); + } + + if(actionGaps.empty()) + Log::debug("No action gaps received, sending all data"); + + // TODO(Performance improvement): Instead of sending several packets, combine them into one + databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + { + bool sendData = false; + auto actionGapsIt = actionGaps.find(creatorPublicKey); + if(actionGapsIt == actionGaps.end()) { - unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); - deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); - u64 actionGapStart = deserializer.extract<u64>(); - u64 actionGapRange = deserializer.extract<u64>(); - - DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); - actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); - userPublicKeys.emplace_back(move(userPublicKeyRaw)); + Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); + sendData = true; } - - if(actionGaps.empty()) - Log::debug("No action gaps received, sending all data"); - - // TODO(Performance improvement): Instead of sending several packets, combine them into one - databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + else { - bool sendData = false; - auto actionGapsIt = actionGaps.find(creatorPublicKey); - if(actionGapsIt == actionGaps.end()) + for(const auto &userActionGaps : actionGapsIt->second) { - Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); - sendData = true; - } - else - { - for(const auto &userActionGaps : actionGapsIt->second) + if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) { - if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) - { - Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); - sendData = true; - break; - } + Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); + sendData = true; + break; } } - - if(!sendData) return; - shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, *requestResponseInfoHash, value); - this_thread::sleep_for(chrono::milliseconds(50)); - }, fetchOrder); - } - catch (std::exception &e) - { - Log::warn("Failed while serving peer, error: %s", e.what()); - } + } + + if(!sendData) return; + bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + this_thread::sleep_for(chrono::milliseconds(50)); + }, fetchOrder); + return true; } void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder) @@ -322,93 +281,59 @@ namespace odhtdb Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); - auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> value) + newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { + if(!peer) + return true; + Log::debug("Seed: New data listener received data..."); - const Hash requestHash(value->data.data(), value->data.size()); + const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) return true; - //return listenCreateData(value, requestHash, encryptionKey); else - return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); + return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); - newSeedInfo.newDataListenerFuture = make_shared<future<size_t>>(move(newDataListenerFuture)); - u8 responseKey[OPENDHT_INFOHASH_LEN]; - randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN); - shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, OPENDHT_INFOHASH_LEN);; + u8 responseKey[sibs::PUBSUB_KEY_LENGTH]; + randombytes_buf(responseKey, sibs::PUBSUB_KEY_LENGTH); + shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, sibs::PUBSUB_KEY_LENGTH); newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. - auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value) + newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - if(value->data.size() == OPENDHT_INFOHASH_LEN) - { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - InfoHash pingResponseKey; - deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN); - shared_ptr<Value> responseValue = make_shared<Value>(); - nodePutWithRetry(&node, pingResponseKey, responseValue); + if(!peer) return true; - } - - const Hash requestHash(value->data.data(), value->data.size()); + + const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) - return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey()); + return listenCreateData(data, size, requestHash, nodeToSeed.getNodeEncryptionKey()); else - return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); + return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); - newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture)); // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. - auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value) + newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { + if(!peer) + return true; + try { - static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?"); - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - u16 requestStructureVersion = deserializer.extract<u16>(); - if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) - { - Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); - return true; - } - shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>(); - deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN); - if(*responseKeyShared == *requestResponseInfoHash) - { - Log::debug("Request: Ignorning request for old data from ourself"); - return true; - } - else - Log::debug("Request: Got request from somebody else"); - - u8 pingResponseKey[OPENDHT_INFOHASH_LEN]; - randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN); - InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN); - usize valueOffset = value->data.size() - deserializer.getSize(); - node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr<Value> _) - { - sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset); - return false; - }); - - shared_ptr<Value> pingRequest = make_shared<Value>(pingResponseKey, OPENDHT_INFOHASH_LEN); - nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest); + return sendOldDataToPeer(nodeToSeed, data, size); } catch (std::exception &e) { Log::warn("Failed while serving peer, error: %s", e.what()); + return true; } - return true; }); - newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture)); seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo; sibs::SafeSerializer serializer; serializer.add(DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); - serializer.add(responseKey, OPENDHT_INFOHASH_LEN); + serializer.add(responseKey, sibs::PUBSUB_KEY_LENGTH); bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash()); serializer.add(iHaveCreateNode ? (u8)0 : (u8)1); serializer.add(fetchOrder); @@ -440,8 +365,7 @@ namespace odhtdb } Log::debug("Sending request for old data"); - shared_ptr<Value> requestValue = make_shared<Value>(move(serializer.getBuffer())); - nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue); + bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } void Database::stopSeeding(const Hash &nodeHash) @@ -449,11 +373,10 @@ namespace odhtdb auto seedInfoIt = seedInfoMap.find(nodeHash); if(seedInfoIt != seedInfoMap.end()) { - // TODO: Verify if doing get on listener future stalls program forever... Opendht documentation is not clear on this DhtKey dhtKey(nodeHash); - node.cancelListen(dhtKey.getNewDataListenerKey(), seedInfoIt->second.newDataListenerFuture->get()); - node.cancelListen(dhtKey.getRequestOldDataKey(), seedInfoIt->second.requestOldDataListenerFuture->get()); - node.cancelListen(*seedInfoIt->second.reponseKeyInfoHash, seedInfoIt->second.responseKeyFuture->get()); + bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle); + bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle); + bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle); seedInfoMap.erase(seedInfoIt); } } @@ -488,9 +411,11 @@ namespace odhtdb databaseStorage.setNodeDecryptionKey(*hashRequestKey, DataView(encryptionKey->data, encryptionKey->size)); databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size()); + DatabaseNode databaseNode = { encryptionKey, hashRequestKey }; + seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); + DhtKey dhtKey(*hashRequestKey); - shared_ptr<Value> createDataValue = make_shared<Value>(move(serializer.getBuffer())); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -522,8 +447,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -551,8 +475,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() @@ -571,9 +494,9 @@ namespace odhtdb return timestamp; } - void Database::deserializeCreateRequest(const shared_ptr<dht::Value> &value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) + void Database::deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + sibs::SafeDeserializer deserializer((const u8*)data, size); u16 packetStructureVersion = deserializer.extract<u16>(); if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) { @@ -605,12 +528,12 @@ namespace odhtdb uint8_t adminGroupId[GROUP_ID_LENGTH]; deserializer.extract(adminGroupId, GROUP_ID_LENGTH); - databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, value->data.data(), value->data.size()); + databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, data, size); } - void Database::deserializeAddRequest(const shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) + void Database::deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + sibs::SafeDeserializer deserializer((const u8*)data, size); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); @@ -647,10 +570,10 @@ namespace odhtdb u64 newActionCounter = deserializerUnsigned.extract<u64>(); DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); - databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); + databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, data, size, additionalDataView); } - bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) + bool Database::listenCreateData(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) { Log::debug("Got create data in node %s", hash.toString().c_str()); try @@ -659,7 +582,7 @@ namespace odhtdb // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesNodeExist(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); - deserializeCreateRequest(value, hash, encryptionKey); + deserializeCreateRequest(data, size, hash, encryptionKey); } catch (exception &e) { @@ -668,7 +591,7 @@ namespace odhtdb return true; } - bool Database::listenAddData(shared_ptr<dht::Value> value, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) + bool Database::listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) { Log::debug("Got add data in node %s", nodeHash->toString().c_str()); try @@ -677,7 +600,7 @@ namespace odhtdb // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesDataExist(requestDataHash)) throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)"); - deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey); + deserializeAddRequest(data, size, requestDataHash, nodeHash, encryptionKey); } catch (RequestQuarantineException &e) { @@ -720,47 +643,44 @@ namespace odhtdb return databaseStorage.getUserLowestPermissionLevel(nodeHash, userPublicKey); } - std::future<size_t> Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) + sibs::ListenHandle Database::receiveCustomMessage(const InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) { - dht::InfoHash responseKey = receiveMessageKey; + InfoHash responseKey = receiveMessageKey; ++responseKey[0]; - return node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr<Value> value) + return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size()); + sibs::SafeSerializer serializer = callbackFunc(data, size); if(!serializer.getBuffer().empty()) { - shared_ptr<Value> responseValue = make_shared<Value>(move(serializer.getBuffer())); - nodePutWithRetry(&node, responseKey, responseValue); + bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } return true; }); } - void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data) + void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size) { - shared_ptr<Value> value = make_shared<Value>(move(data)); - nodePutWithRetry(&node, requestKey, value); + bootstrapConnection.put(requestKey.getKey(), data, size); } - std::future<size_t> Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc) + sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc) { - dht::InfoHash responseKey = requestKey; + InfoHash responseKey = requestKey; ++responseKey[0]; - auto listener = node.listen(responseKey, [callbackFunc](const shared_ptr<Value> value) + auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - return callbackFunc(true, value->data.data(), value->data.size()); + return callbackFunc(true, data, size); }); - shared_ptr<Value> value = make_shared<Value>(move(data)); - nodePutWithRetry(&node, requestKey, value); + bootstrapConnection.put(requestKey.getKey(), data, size); return listener; } - void Database::cancelNodeListener(const dht::InfoHash &infoHash, std::future<size_t> &nodeListener) + void Database::cancelNodeListener(sibs::ListenHandle &nodeListener) { - node.cancelListen(infoHash, nodeListener.get()); + bootstrapConnection.cancelListen(nodeListener); } int Database::clearCache() @@ -768,8 +688,8 @@ namespace odhtdb return databaseStorage.clearCache(); } - dht::InfoHash Database::getInfoHash(const void *data, usize size) + InfoHash Database::getInfoHash(const void *data, usize size) { - return dht::InfoHash::get((const u8*)data, size); + return InfoHash::generateHash((const u8*)data, size); } } diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index 9398254..14b74e8 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -158,18 +158,6 @@ namespace odhtdb metadataSerializer.add(STORAGE_VERSION); randombytes_buf(passwordSalt, PASSWORD_SALT_LEN); metadataSerializer.add(passwordSalt, PASSWORD_SALT_LEN); - - identity = dht::crypto::generateIdentity(); - dht::Blob privateKeyData = identity.first->serialize(); - metadataSerializer.add((u16)privateKeyData.size()); - metadataSerializer.add(privateKeyData.data(), privateKeyData.size()); - - dht::Blob certificateData; - identity.second->pack(certificateData); - metadataSerializer.add((u16)certificateData.size()); - metadataSerializer.add(certificateData.data(), certificateData.size()); - - fileAppend(metadataFilePath, { metadataSerializer.getBuffer().data(), metadataSerializer.getBuffer().size() }); } catch(sibs::DeserializeException &e) { @@ -335,37 +323,13 @@ namespace odhtdb throw std::runtime_error("Wrong storage version!"); deserializer.extract(passwordSalt, PASSWORD_SALT_LEN); - - u16 privateKeySize = deserializer.extract<u16>(); - dht::Blob privateKeyRaw; - privateKeyRaw.resize(privateKeySize); - deserializer.extract(&privateKeyRaw[0], privateKeySize); - identity.first = make_shared<dht::crypto::PrivateKey>(privateKeyRaw); - - u16 certificateSize = deserializer.extract<u16>(); - dht::Blob certificateRaw; - certificateRaw.resize(certificateSize); - deserializer.extract(&certificateRaw[0], certificateSize); - identity.second = make_shared<dht::crypto::Certificate>(certificateRaw); - assert(deserializer.empty()); } void DatabaseStorage::loadRemoteNodesFromFile() { OwnedByteArray remoteNodesFileContent = fileGetContent(remoteNodesFilePath); - msgpack::unpacker pac; - pac.reserve_buffer(remoteNodesFileContent.size); - memcpy(pac.buffer(), remoteNodesFileContent.data, remoteNodesFileContent.size); - pac.buffer_consumed(remoteNodesFileContent.size); - - msgpack::object_handle oh; - while(pac.next(oh)) - { - auto importedNodes = oh.get().as<vector<dht::NodeExport>>(); - remoteNodes.reserve(remoteNodes.size() + importedNodes.size()); - remoteNodes.insert(remoteNodes.end(), importedNodes.begin(), importedNodes.end()); - } + remotePeers = sibs::DirectConnectionsUtils::deserializePeers(remoteNodesFileContent.data, remoteNodesFileContent.size); } static void sqlite_step_throw_on_failure(sqlite3 *db, sqlite3_stmt *stmt, const char *description) @@ -1025,29 +989,17 @@ namespace odhtdb decryptNodeData(nodeHash, nodeDecryptionKeyResult.second); } - struct RemoteNodePacker + const vector<std::shared_ptr<sibs::DirectConnectionPeer>>& DatabaseStorage::getRemotePeers() const { - sibs::SafeSerializer serializer; - - RemoteNodePacker& write(const char *data, size_t size) - { - serializer.add((const u8*)data, size); - return *this; - } - }; - - const vector<dht::NodeExport>& DatabaseStorage::getRemoteNodes() const - { - return remoteNodes; + return remotePeers; } - void DatabaseStorage::setRemoteNodes(const std::vector<dht::NodeExport> &remoteNodes) + void DatabaseStorage::setRemotePeers(const std::vector<std::shared_ptr<sibs::DirectConnectionPeer>> &remotePeers) { - Log::debug("Storing %u remote nodes", remoteNodes.size()); - this->remoteNodes = remoteNodes; - RemoteNodePacker remoteNodePacker; - msgpack::pack(remoteNodePacker, remoteNodes); - fileOverwrite(remoteNodesFilePath, DataView(remoteNodePacker.serializer.getBuffer().data(), remoteNodePacker.serializer.getBuffer().size())); + Log::debug("Storing %u remote peers", remotePeers.size()); + this->remotePeers = remotePeers; + std::vector<u8> serializedPeers = sibs::DirectConnectionsUtils::serializePeers(remotePeers); + fileOverwrite(remoteNodesFilePath, DataView(serializedPeers.data(), serializedPeers.size())); } vector<OwnedByteArray> DatabaseStorage::getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const @@ -1292,11 +1244,6 @@ namespace odhtdb return true; } - const dht::crypto::Identity& DatabaseStorage::getIdentity() const - { - return identity; - } - void DatabaseStorage::update() { auto time = chrono::high_resolution_clock::now().time_since_epoch(); diff --git a/src/DhtKey.cpp b/src/DhtKey.cpp index 1508657..b84dbb7 100644 --- a/src/DhtKey.cpp +++ b/src/DhtKey.cpp @@ -8,24 +8,24 @@ namespace odhtdb firstByteOriginalValue = infoHash[0]; } - DhtKey::DhtKey(const dht::InfoHash &_infoHash) : infoHash(_infoHash) + DhtKey::DhtKey(const InfoHash &_infoHash) : infoHash(_infoHash) { firstByteOriginalValue = infoHash[0]; } - dht::InfoHash DhtKey::getNewDataListenerKey() + InfoHash DhtKey::getNewDataListenerKey() { infoHash[0] = firstByteOriginalValue; return infoHash; } - dht::InfoHash DhtKey::getRequestOldDataKey() + InfoHash DhtKey::getRequestOldDataKey() { infoHash[0] = firstByteOriginalValue + 1; return infoHash; } - dht::InfoHash DhtKey::getPingKey() + InfoHash DhtKey::getPingKey() { infoHash[0] = firstByteOriginalValue + 10; return infoHash; diff --git a/src/InfoHash.cpp b/src/InfoHash.cpp new file mode 100644 index 0000000..90e6e31 --- /dev/null +++ b/src/InfoHash.cpp @@ -0,0 +1,37 @@ +#include "../include/odhtdb/InfoHash.hpp" +#include <sodium/crypto_generichash_blake2b.h> +#include <sodium/core.h> +#include <algorithm> + +namespace odhtdb +{ + InfoHash::InfoHash() + { + + } + + InfoHash::InfoHash(const u8 *data, const size_t size) : + key(data, size) + { + + } + + InfoHash InfoHash::generateHash(const u8 *data, const size_t size) + { + InfoHash infoHash; + int result = crypto_generichash_blake2b((unsigned char*)&infoHash.key.data[0], infoHash.key.data.size(), (const unsigned char*)data, size, nullptr, 0); + if(result < 0) + throw InfoHashException("Failed to hash data using blake2b"); + return infoHash; + } + + bool InfoHash::operator == (const InfoHash &other) const + { + return key == other.key; + } + + bool InfoHash::operator != (const InfoHash &other) const + { + return !(*this == other); + } +}
\ No newline at end of file diff --git a/tests/main.cpp b/tests/main.cpp index 17348c4..1541d2a 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -10,15 +10,17 @@ #include <vector> #include <chrono> #include <thread> -#include <opendht.h> #include <boost/filesystem.hpp> #include <map> +#include <sibs/BootstrapNode.hpp> using namespace std; using namespace chrono_literals; using namespace odhtdb; -void testBinHexConvert() +static const u16 PORT = 22111; + +static void testBinHexConvert() { DataView input { (void*)"hello", 5 }; @@ -41,7 +43,7 @@ void testBinHexConvert() } } -void testHash() +static void testHash() { Hash hash("odhtdb", 6); string hashHex = hash.toString(); @@ -49,7 +51,7 @@ void testHash() Log::debug("hash of 'odhtdb' is: a7b30ec8ab92de60e551b26bb8f78d315697f84dd7f5549a143477e095ec934f"); } -void testSignData(const Signature::KeyPair &localUserKeyPair) +static void testSignData(const Signature::KeyPair &localUserKeyPair) { std::string publicKeyStr = localUserKeyPair.getPublicKey().toString(); Log::debug("Local user public key: %s", publicKeyStr.c_str()); @@ -99,7 +101,7 @@ void testSignData(const Signature::KeyPair &localUserKeyPair) } } -void testEncryption() +static void testEncryption() { const char *message = "hello, world!"; const unsigned long long messageLength = 13; @@ -110,27 +112,7 @@ void testEncryption() assertEquals(0, strncmp(message, (const char*)decryption.getDecryptedText().data, messageLength)); } -void testCachedIdentity() -{ - pair<shared_ptr<dht::crypto::PrivateKey>, shared_ptr<dht::crypto::Certificate>> identity = dht::crypto::generateIdentity(); - dht::Blob privateKeyData = identity.first->serialize(); - Log::debug("Private key size: %d, serialized data: %s", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str()); - - dht::crypto::PrivateKey privateKeyDeserialized(privateKeyData); - privateKeyData = identity.first->serialize(); - Log::debug("Private key size: %d, serialized data: %s", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str()); - - dht::Blob certificateData; - identity.second->pack(certificateData); - Log::debug("Certificate data size: %d, serialized data: %s", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str()); - - dht::crypto::Certificate certificateDeserialized(certificateData); - certificateData.clear(); - identity.second->pack(certificateData); - Log::debug("Certificate data size: %d, serialized data: %s", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str()); -} - -void testTimestamp(const Database &database) +static void testTimestamp(const Database &database) { auto timestamp1 = database.getSyncedTimestampUtc(); this_thread::sleep_for(chrono::milliseconds(100)); @@ -142,9 +124,8 @@ void testTimestamp(const Database &database) fail("Second timestamp is not more than first one for some reason"); } -void testStandard() +static void testStandard() { - testCachedIdentity(); testBinHexConvert(); testHash(); testEncryption(); @@ -182,8 +163,7 @@ void testStandard() Signature::KeyPair localUserKeyPair; testSignData(localUserKeyPair); - // TODO: Setup local bootstrap node for tests - Database database("bootstrap.ring.cx", 4222, storagePath, callbackFuncs); + Database database("127.0.0.1", PORT, storagePath, callbackFuncs); testTimestamp(database); auto databaseCreateResponse = database.create(); @@ -192,8 +172,6 @@ void testStandard() database.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); database.addUser(databaseNode, *adminUserKey, localUserKeyPair.getPublicKey(), databaseCreateResponse->getNodeAdminGroupId()->getView()); database.addData(databaseNode, localUserKeyPair, DataView{ (void*)"hello, aaald!", 13 }); - - database.seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); this_thread::sleep_for(chrono::seconds(3)); assertEquals(1, createNodeCounter); @@ -259,8 +237,7 @@ void testStandard() addDataCounter = 0; addUserCounter = 0; - // TODO: Setup local bootstrap node for tests - Database database("bootstrap.ring.cx", 4222, storagePath, callbackFuncs); + Database database("127.0.0.1", PORT, storagePath, callbackFuncs); database.loadNode(*databaseNode.getRequestHash()); database.seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); @@ -270,7 +247,7 @@ void testStandard() assertEquals(2, addDataCounter); assertEquals(1, addUserCounter); - dht::InfoHash customMessageKey = Database::getInfoHash("asdf", 4); + InfoHash customMessageKey = Database::getInfoHash("asdf", 4); sibs::SafeSerializer messageToSendSerializer; messageToSendSerializer.add((u32)10); @@ -285,7 +262,7 @@ void testStandard() }); u32 sendCustomMessageResponseNumber = 0; - database.sendCustomMessage(customMessageKey, move(messageToSendSerializer.getBuffer()), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size) + database.sendCustomMessage(customMessageKey, messageToSendSerializer.getBuffer().data(), messageToSendSerializer.getBuffer().size(), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size) { if(!gotResponse) { @@ -305,7 +282,7 @@ void testStandard() } } -void testTwoLocalNodes() +static void testTwoLocalNodes() { boost::filesystem::path storagePath1("/tmp/odhtdbTest1"); boost::filesystem::remove_all(storagePath1); @@ -332,18 +309,19 @@ void testTwoLocalNodes() DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; - Database database1("bootstrap.ring.cx", 4222, storagePath1, callbackFuncs); + Database database1("127.0.0.1", PORT, storagePath1, callbackFuncs); auto databaseCreateResponse = database1.create(); DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() }; auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair(); database1.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); - database1.seed(databaseNode); - Database database2("bootstrap.ring.cx", 4223, storagePath2, callbackFuncs); + Database database2("127.0.0.1", PORT, storagePath2, callbackFuncs); database2.seed(databaseNode); + + this_thread::sleep_for(chrono::seconds(5)); } -void testMemoryUsage() +static void testMemoryUsage() { boost::filesystem::path storagePath("/tmp/odhtdbTestMemoryUsage"); boost::filesystem::remove_all(storagePath); @@ -366,11 +344,10 @@ void testMemoryUsage() DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; - Database database("bootstrap.ring.cx", 4222, storagePath, callbackFuncs); + Database database("127.0.0.1", PORT, storagePath, callbackFuncs); auto databaseCreateResponse = database.create(); DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() }; auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair(); - database.seed(databaseNode); const char *msg = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."; const usize msgLength = strlen(msg); @@ -384,7 +361,7 @@ void testMemoryUsage() } } -pair<bool, shared_ptr<OwnedByteArray>> __attribute__((optimize("O0"))) getDataNoCopy() +static pair<bool, shared_ptr<OwnedByteArray>> __attribute__((optimize("O0"))) getDataNoCopy() { u8 *decryptionKeyRawCopy = new u8[1024 * 1024 * 64]; memcpy(decryptionKeyRawCopy, "hello, world!", 14); @@ -392,7 +369,7 @@ pair<bool, shared_ptr<OwnedByteArray>> __attribute__((optimize("O0"))) getDataNo return make_pair(true, decryptionKey); } -void __attribute__((optimize("O0"))) testMemoryLeak() +static void __attribute__((optimize("O0"))) testMemoryLeak() { { auto data = getDataNoCopy(); @@ -408,6 +385,10 @@ struct Test int main(int argc, char **argv) { + sibs::BootstrapNode bootstrapNode(sibs::Ipv4(nullptr, PORT)); + // Wait until bootstrap node is ready to accept connections + this_thread::sleep_for(chrono::seconds(2)); + map<string, Test> testByName; testByName["standard"] = { testStandard, false }; testByName["two_local_nodes"] = { testTwoLocalNodes, false }; @@ -427,6 +408,7 @@ int main(int argc, char **argv) { Log::debug("Running test: %s", testIt.first.c_str()); testIt.second.testFunc(); + Log::debug("Finished test: %s", testIt.first.c_str()); } } } @@ -441,6 +423,7 @@ int main(int argc, char **argv) Log::debug("Running test: %s", testIt->first.c_str()); testIt->second.testFunc(); + Log::debug("Finished test: %s", testIt->first.c_str()); } return 0; |