diff options
author | dec05eba <dec05eba@protonmail.com> | 2018-02-03 19:33:05 +0100 |
---|---|---|
committer | dec05eba <dec05eba@protonmail.com> | 2020-08-18 23:25:12 +0200 |
commit | 5c1a20c4dacfe03db90b70c2665e66a76574196c (patch) | |
tree | 5d2d418f7ac0f84aba8439e75683b1f53f053465 /src | |
parent | 1c7e6e074155499155adbbb651db1c66f1762ba2 (diff) |
Add seed function, not yet finished
Diffstat (limited to 'src')
-rw-r--r-- | src/Database.cpp | 97 |
1 files changed, 91 insertions, 6 deletions
diff --git a/src/Database.cpp b/src/Database.cpp index 33aef54..c2d08cb 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -7,6 +7,8 @@ #include <thread> #include <chrono> #include <sibs/SafeSerializer.hpp> +#include <sibs/SafeDeserializer.hpp> +#include <cassert> using namespace dht; using namespace std; @@ -17,6 +19,10 @@ static int databaseCount = 0; static time_t timeOffset = 0; // Updated by comparing local time with ntp server static thread *ntpThread = nullptr; static bool timestampSynced = false; +static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data"); +static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data"); + +#define OPENDHT_INFOHASH_LEN 20 namespace odhtdb { @@ -66,18 +72,30 @@ namespace odhtdb node.join(); } + void Database::seed() + { + // TODO: Use cached files and seed those. If none exists, request new files to seed. + // If nobody requests my cached files in a long time, request new files to seed and remove cached files + // (only if there are plenty of other seeders for the cached files. This could also cause race issue + // where all nodes with a cached file delete it at same time) + + using std::placeholders::_1; + node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); + node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1)); + } + void Database::create(const Key &key, Group *primaryAdminGroup) { // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; - stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds }); + stagedCreateObjects.emplace_back(StagedCreateObject(key, primaryAdminGroup, timeMicroseconds)); } void Database::add(const Key &key, DataView data) { // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; - stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds }); + stagedAddObjects.emplace_back(StagedAddObject(key, data, timeMicroseconds)); } void Database::commit() @@ -103,7 +121,9 @@ namespace odhtdb // TODO: Use (ed25519 or poly1305) and curve25519 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) sibs::SafeSerializer serializer; + assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); + serializer.add(stagedObject.timestamp); serializer.add((u8)stagedObject.primaryAdminGroup->getName().size()); serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size()); serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size()); @@ -113,9 +133,9 @@ namespace odhtdb serializer.add((u8*)user->getName().data(), user->getName().size()); } - // TODO: Verify if serializer buffer needs to survive longer than this method + // TODO: Verify if serializer buffer needs to survive longer than this scope Value value(serializer.getBuffer().data(), serializer.getBuffer().size()); - node.put(stagedObject.key.hashedKey, move(value), [](bool ok) + node.put(CREATE_DATA_HASH, move(value), [](bool ok) { // TODO: Handle failure to put data if(!ok) @@ -127,8 +147,15 @@ namespace odhtdb { // TODO: Use (ed25519 or poly1305) and curve25519 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) - Value value((const u8*)stagedObject.data.data, stagedObject.data.size); - node.put(stagedObject.key.hashedKey, move(value), [](bool ok) + sibs::SafeSerializer serializer; + assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); + serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); + serializer.add(stagedObject.timestamp); + serializer.add((u8*)stagedObject.data.data, stagedObject.data.size); + + // TODO: Verify if serializer buffer needs to survive longer than this scope + Value value(serializer.getBuffer().data(), serializer.getBuffer().size()); + node.put(ADD_DATA_HASH, move(value), [](bool ok) { // TODO: Handle failure to put data if(!ok) @@ -144,4 +171,62 @@ namespace odhtdb timestamp.fractions = 0; // TODO: Set this return timestamp; } + + StagedCreateObject Database::deserializeCreateRequest(const std::shared_ptr<dht::Value> &value) + { + StagedCreateObject result; + + sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; + deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.timestamp = deserializer.extract<u64>(); + + return result; + } + + StagedAddObject Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value) + { + StagedAddObject result; + + sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; + deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); + result.timestamp = deserializer.extract<u64>(); + + return result; + } + + bool Database::listenCreateData(const std::vector<std::shared_ptr<dht::Value>> &values) + { + for(const shared_ptr<Value> &value : values) + { + try + { + StagedCreateObject createObject = deserializeCreateRequest(value); + } + catch (sibs::DeserializeException &e) + { + fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); + } + } + return true; + } + + bool Database::listenAddData(const std::vector<std::shared_ptr<dht::Value>> &values) + { + for(const shared_ptr<Value> &value : values) + { + try + { + StagedAddObject addObject = deserializeAddRequest(value); + } + catch (sibs::DeserializeException &e) + { + fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); + } + } + return true; + } }
\ No newline at end of file |