diff options
-rw-r--r-- | include/Database.hpp | 20 | ||||
-rw-r--r-- | include/Key.hpp | 1 | ||||
-rw-r--r-- | include/StagedObject.hpp | 38 | ||||
-rw-r--r-- | src/Database.cpp | 97 | ||||
-rw-r--r-- | tests/main.cpp | 2 |
5 files changed, 137 insertions, 21 deletions
diff --git a/include/Database.hpp b/include/Database.hpp index 20d7513..68fff62 100644 --- a/include/Database.hpp +++ b/include/Database.hpp @@ -2,6 +2,7 @@ #include "types.hpp" #include "Key.hpp" +#include "StagedObject.hpp" #include "DataView.hpp" #include <opendht/dhtrunner.h> #include <vector> @@ -11,26 +12,13 @@ namespace odhtdb { class Group; - struct StagedCreateObject - { - Key key; - Group *primaryAdminGroup; - u64 timestamp; // In microseconds - }; - - struct StagedAddObject - { - Key key; - DataView data; - u64 timestamp; // In microseconds - }; - class Database { public: Database(const char *bootstrapNodeAddr, u16 port); ~Database(); + void seed(); void create(const Key &key, Group *primaryAdminGroup); void add(const Key &key, DataView data); void commit(); @@ -38,6 +26,10 @@ namespace odhtdb void commitStagedCreateObject(const StagedCreateObject &stagedObject); void commitStagedAddObject(const StagedAddObject &stagedObject); ntp::NtpTimestamp getSyncedTimestampUtc() const; + StagedCreateObject deserializeCreateRequest(const std::shared_ptr<dht::Value> &value); + StagedAddObject deserializeAddRequest(const std::shared_ptr<dht::Value> &value); + bool listenCreateData(const std::vector<std::shared_ptr<dht::Value>> &values); + bool listenAddData(const std::vector<std::shared_ptr<dht::Value>> &values); private: dht::DhtRunner node; std::vector<StagedCreateObject> stagedCreateObjects; diff --git a/include/Key.hpp b/include/Key.hpp index e9f0591..505050d 100644 --- a/include/Key.hpp +++ b/include/Key.hpp @@ -7,6 +7,7 @@ namespace odhtdb class Key { public: + Key() {} Key(const char *key) : hashedKey(dht::InfoHash::get(key)) {} dht::InfoHash hashedKey; diff --git a/include/StagedObject.hpp b/include/StagedObject.hpp new file mode 100644 index 0000000..61e1073 --- /dev/null +++ b/include/StagedObject.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include "Key.hpp" +#include "types.hpp" +#include "DataView.hpp" + +namespace odhtdb +{ + class Group; + + struct StagedCreateObject + { + Key key; + Group *primaryAdminGroup; + u64 timestamp; // In microseconds + + StagedCreateObject() : key(), primaryAdminGroup(nullptr), timestamp(0) {} + StagedCreateObject(const Key &_key, Group *_primaryAdminGroup, u64 _timestamp) : + key(_key), primaryAdminGroup(_primaryAdminGroup), timestamp(_timestamp) + { + + } + }; + + struct StagedAddObject + { + Key key; + DataView data; + u64 timestamp; // In microseconds + + StagedAddObject() : key(), data(), timestamp(0) {} + StagedAddObject(const Key &_key, const DataView &_data, u64 _timestamp) : + key(_key), data(_data), timestamp(_timestamp) + { + + } + }; +}
\ No newline at end of file diff --git a/src/Database.cpp b/src/Database.cpp index 33aef54..c2d08cb 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -7,6 +7,8 @@ #include <thread> #include <chrono> #include <sibs/SafeSerializer.hpp> +#include <sibs/SafeDeserializer.hpp> +#include <cassert> using namespace dht; using namespace std; @@ -17,6 +19,10 @@ static int databaseCount = 0; static time_t timeOffset = 0; // Updated by comparing local time with ntp server static thread *ntpThread = nullptr; static bool timestampSynced = false; +static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data"); +static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data"); + +#define OPENDHT_INFOHASH_LEN 20 namespace odhtdb { @@ -66,18 +72,30 @@ namespace odhtdb node.join(); } + void Database::seed() + { + // TODO: Use cached files and seed those. If none exists, request new files to seed. + // If nobody requests my cached files in a long time, request new files to seed and remove cached files + // (only if there are plenty of other seeders for the cached files. This could also cause race issue + // where all nodes with a cached file delete it at same time) + + using std::placeholders::_1; + node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); + node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1)); + } + void Database::create(const Key &key, Group *primaryAdminGroup) { // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; - stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds }); + stagedCreateObjects.emplace_back(StagedCreateObject(key, primaryAdminGroup, timeMicroseconds)); } void Database::add(const Key &key, DataView data) { // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; - stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds }); + stagedAddObjects.emplace_back(StagedAddObject(key, data, timeMicroseconds)); } void Database::commit() @@ -103,7 +121,9 @@ namespace odhtdb // TODO: Use (ed25519 or poly1305) and curve25519 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) sibs::SafeSerializer serializer; + assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); + serializer.add(stagedObject.timestamp); serializer.add((u8)stagedObject.primaryAdminGroup->getName().size()); serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size()); serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size()); @@ -113,9 +133,9 @@ namespace odhtdb serializer.add((u8*)user->getName().data(), user->getName().size()); } - // TODO: Verify if serializer buffer needs to survive longer than this method + // TODO: Verify if serializer buffer needs to survive longer than this scope Value value(serializer.getBuffer().data(), serializer.getBuffer().size()); - node.put(stagedObject.key.hashedKey, move(value), [](bool ok) + node.put(CREATE_DATA_HASH, move(value), [](bool ok) { // TODO: Handle failure to put data if(!ok) @@ -127,8 +147,15 @@ namespace odhtdb { // TODO: Use (ed25519 or poly1305) and curve25519 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) - Value value((const u8*)stagedObject.data.data, stagedObject.data.size); - node.put(stagedObject.key.hashedKey, move(value), [](bool ok) + sibs::SafeSerializer serializer; + assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); + serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); + serializer.add(stagedObject.timestamp); + serializer.add((u8*)stagedObject.data.data, stagedObject.data.size); + + // TODO: Verify if serializer buffer needs to survive longer than this scope + Value value(serializer.getBuffer().data(), serializer.getBuffer().size()); + node.put(ADD_DATA_HASH, move(value), [](bool ok) { // TODO: Handle failure to put data if(!ok) @@ -144,4 +171,62 @@ namespace odhtdb timestamp.fractions = 0; // TODO: Set this return timestamp; } + + StagedCreateObject Database::deserializeCreateRequest(const std::shared_ptr<dht::Value> &value) + { + StagedCreateObject result; + + sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; + deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.timestamp = deserializer.extract<u64>(); + + return result; + } + + StagedAddObject Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value) + { + StagedAddObject result; + + sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; + deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.timestamp = deserializer.extract<u64>(); + + return result; + } + + bool Database::listenCreateData(const std::vector<std::shared_ptr<dht::Value>> &values) + { + for(const shared_ptr<Value> &value : values) + { + try + { + StagedCreateObject createObject = deserializeCreateRequest(value); + } + catch (sibs::DeserializeException &e) + { + fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); + } + } + return true; + } + + bool Database::listenAddData(const std::vector<std::shared_ptr<dht::Value>> &values) + { + for(const shared_ptr<Value> &value : values) + { + try + { + StagedAddObject addObject = deserializeAddRequest(value); + } + catch (sibs::DeserializeException &e) + { + fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); + } + } + return true; + } }
\ No newline at end of file diff --git a/tests/main.cpp b/tests/main.cpp index 2618d18..5ddcbe9 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -10,9 +10,9 @@ int main() // TODO: For tests, dont run against bootstrap.ring.cx. // Run against a bootstrap node made only for testing which doesn't persist added data. Database database("bootstrap.ring.cx", 4222); + database.seed(); LocalUser *localUser = LocalUser::create("dec05eba"); - Group group("admin"); group.addUser(localUser); database.create("galax.channel.latenight.chat", &group); |