aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksi Lindeman <aleksi_888@hotmail.com>2018-02-03 19:33:05 +0100
committerAleksi Lindeman <aleksi_888@hotmail.com>2018-02-03 19:33:11 +0100
commit28efc0068f47ec787791a07a63d720710068c095 (patch)
treebb831558de90002c8c774cd58220dbc0baeb7a12 /src
parent6f1089db78f14b52b869f5aaa979e52ff5e4c2d7 (diff)
Add seed function, not yet finished
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp97
1 files changed, 91 insertions, 6 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 33aef54..c2d08cb 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -7,6 +7,8 @@
#include <thread>
#include <chrono>
#include <sibs/SafeSerializer.hpp>
+#include <sibs/SafeDeserializer.hpp>
+#include <cassert>
using namespace dht;
using namespace std;
@@ -17,6 +19,10 @@ static int databaseCount = 0;
static time_t timeOffset = 0; // Updated by comparing local time with ntp server
static thread *ntpThread = nullptr;
static bool timestampSynced = false;
+static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data");
+static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data");
+
+#define OPENDHT_INFOHASH_LEN 20
namespace odhtdb
{
@@ -66,18 +72,30 @@ namespace odhtdb
node.join();
}
+ void Database::seed()
+ {
+ // TODO: Use cached files and seed those. If none exists, request new files to seed.
+ // If nobody requests my cached files in a long time, request new files to seed and remove cached files
+ // (only if there are plenty of other seeders for the cached files. This could also cause race issue
+ // where all nodes with a cached file delete it at same time)
+
+ using std::placeholders::_1;
+ node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1));
+ node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1));
+ }
+
void Database::create(const Key &key, Group *primaryAdminGroup)
{
// TODO: Append fractions to get real microseconds time
u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
- stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds });
+ stagedCreateObjects.emplace_back(StagedCreateObject(key, primaryAdminGroup, timeMicroseconds));
}
void Database::add(const Key &key, DataView data)
{
// TODO: Append fractions to get real microseconds time
u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
- stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds });
+ stagedAddObjects.emplace_back(StagedAddObject(key, data, timeMicroseconds));
}
void Database::commit()
@@ -103,7 +121,9 @@ namespace odhtdb
// TODO: Use (ed25519 or poly1305) and curve25519
// TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
sibs::SafeSerializer serializer;
+ assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN);
serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size());
+ serializer.add(stagedObject.timestamp);
serializer.add((u8)stagedObject.primaryAdminGroup->getName().size());
serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size());
serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size());
@@ -113,9 +133,9 @@ namespace odhtdb
serializer.add((u8*)user->getName().data(), user->getName().size());
}
- // TODO: Verify if serializer buffer needs to survive longer than this method
+ // TODO: Verify if serializer buffer needs to survive longer than this scope
Value value(serializer.getBuffer().data(), serializer.getBuffer().size());
- node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
+ node.put(CREATE_DATA_HASH, move(value), [](bool ok)
{
// TODO: Handle failure to put data
if(!ok)
@@ -127,8 +147,15 @@ namespace odhtdb
{
// TODO: Use (ed25519 or poly1305) and curve25519
// TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
- Value value((const u8*)stagedObject.data.data, stagedObject.data.size);
- node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
+ sibs::SafeSerializer serializer;
+ assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN);
+ serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size());
+ serializer.add(stagedObject.timestamp);
+ serializer.add((u8*)stagedObject.data.data, stagedObject.data.size);
+
+ // TODO: Verify if serializer buffer needs to survive longer than this scope
+ Value value(serializer.getBuffer().data(), serializer.getBuffer().size());
+ node.put(ADD_DATA_HASH, move(value), [](bool ok)
{
// TODO: Handle failure to put data
if(!ok)
@@ -144,4 +171,62 @@ namespace odhtdb
timestamp.fractions = 0; // TODO: Set this
return timestamp;
}
+
+ StagedCreateObject Database::deserializeCreateRequest(const std::shared_ptr<dht::Value> &value)
+ {
+ StagedCreateObject result;
+
+ sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ u8 entryKeyRaw[OPENDHT_INFOHASH_LEN];
+ deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.timestamp = deserializer.extract<u64>();
+
+ return result;
+ }
+
+ StagedAddObject Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value)
+ {
+ StagedAddObject result;
+
+ sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ u8 entryKeyRaw[OPENDHT_INFOHASH_LEN];
+ deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.timestamp = deserializer.extract<u64>();
+
+ return result;
+ }
+
+ bool Database::listenCreateData(const std::vector<std::shared_ptr<dht::Value>> &values)
+ {
+ for(const shared_ptr<Value> &value : values)
+ {
+ try
+ {
+ StagedCreateObject createObject = deserializeCreateRequest(value);
+ }
+ catch (sibs::DeserializeException &e)
+ {
+ fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what());
+ }
+ }
+ return true;
+ }
+
+ bool Database::listenAddData(const std::vector<std::shared_ptr<dht::Value>> &values)
+ {
+ for(const shared_ptr<Value> &value : values)
+ {
+ try
+ {
+ StagedAddObject addObject = deserializeAddRequest(value);
+ }
+ catch (sibs::DeserializeException &e)
+ {
+ fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what());
+ }
+ }
+ return true;
+ }
} \ No newline at end of file