aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/Database.hpp20
-rw-r--r--include/Key.hpp1
-rw-r--r--include/StagedObject.hpp38
-rw-r--r--src/Database.cpp97
-rw-r--r--tests/main.cpp2
5 files changed, 137 insertions, 21 deletions
diff --git a/include/Database.hpp b/include/Database.hpp
index 20d7513..68fff62 100644
--- a/include/Database.hpp
+++ b/include/Database.hpp
@@ -2,6 +2,7 @@
#include "types.hpp"
#include "Key.hpp"
+#include "StagedObject.hpp"
#include "DataView.hpp"
#include <opendht/dhtrunner.h>
#include <vector>
@@ -11,26 +12,13 @@ namespace odhtdb
{
class Group;
- struct StagedCreateObject
- {
- Key key;
- Group *primaryAdminGroup;
- u64 timestamp; // In microseconds
- };
-
- struct StagedAddObject
- {
- Key key;
- DataView data;
- u64 timestamp; // In microseconds
- };
-
class Database
{
public:
Database(const char *bootstrapNodeAddr, u16 port);
~Database();
+ void seed();
void create(const Key &key, Group *primaryAdminGroup);
void add(const Key &key, DataView data);
void commit();
@@ -38,6 +26,10 @@ namespace odhtdb
void commitStagedCreateObject(const StagedCreateObject &stagedObject);
void commitStagedAddObject(const StagedAddObject &stagedObject);
ntp::NtpTimestamp getSyncedTimestampUtc() const;
+ StagedCreateObject deserializeCreateRequest(const std::shared_ptr<dht::Value> &value);
+ StagedAddObject deserializeAddRequest(const std::shared_ptr<dht::Value> &value);
+ bool listenCreateData(const std::vector<std::shared_ptr<dht::Value>> &values);
+ bool listenAddData(const std::vector<std::shared_ptr<dht::Value>> &values);
private:
dht::DhtRunner node;
std::vector<StagedCreateObject> stagedCreateObjects;
diff --git a/include/Key.hpp b/include/Key.hpp
index e9f0591..505050d 100644
--- a/include/Key.hpp
+++ b/include/Key.hpp
@@ -7,6 +7,7 @@ namespace odhtdb
class Key
{
public:
+ Key() {}
Key(const char *key) : hashedKey(dht::InfoHash::get(key)) {}
dht::InfoHash hashedKey;
diff --git a/include/StagedObject.hpp b/include/StagedObject.hpp
new file mode 100644
index 0000000..61e1073
--- /dev/null
+++ b/include/StagedObject.hpp
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "Key.hpp"
+#include "types.hpp"
+#include "DataView.hpp"
+
+namespace odhtdb
+{
+ class Group;
+
+ struct StagedCreateObject
+ {
+ Key key;
+ Group *primaryAdminGroup;
+ u64 timestamp; // In microseconds
+
+ StagedCreateObject() : key(), primaryAdminGroup(nullptr), timestamp(0) {}
+ StagedCreateObject(const Key &_key, Group *_primaryAdminGroup, u64 _timestamp) :
+ key(_key), primaryAdminGroup(_primaryAdminGroup), timestamp(_timestamp)
+ {
+
+ }
+ };
+
+ struct StagedAddObject
+ {
+ Key key;
+ DataView data;
+ u64 timestamp; // In microseconds
+
+ StagedAddObject() : key(), data(), timestamp(0) {}
+ StagedAddObject(const Key &_key, const DataView &_data, u64 _timestamp) :
+ key(_key), data(_data), timestamp(_timestamp)
+ {
+
+ }
+ };
+} \ No newline at end of file
diff --git a/src/Database.cpp b/src/Database.cpp
index 33aef54..c2d08cb 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -7,6 +7,8 @@
#include <thread>
#include <chrono>
#include <sibs/SafeSerializer.hpp>
+#include <sibs/SafeDeserializer.hpp>
+#include <cassert>
using namespace dht;
using namespace std;
@@ -17,6 +19,10 @@ static int databaseCount = 0;
static time_t timeOffset = 0; // Updated by comparing local time with ntp server
static thread *ntpThread = nullptr;
static bool timestampSynced = false;
+static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data");
+static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data");
+
+#define OPENDHT_INFOHASH_LEN 20
namespace odhtdb
{
@@ -66,18 +72,30 @@ namespace odhtdb
node.join();
}
+ void Database::seed()
+ {
+ // TODO: Use cached files and seed those. If none exists, request new files to seed.
+ // If nobody requests my cached files in a long time, request new files to seed and remove cached files
+ // (only if there are plenty of other seeders for the cached files. This could also cause race issue
+ // where all nodes with a cached file delete it at same time)
+
+ using std::placeholders::_1;
+ node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1));
+ node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1));
+ }
+
void Database::create(const Key &key, Group *primaryAdminGroup)
{
// TODO: Append fractions to get real microseconds time
u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
- stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds });
+ stagedCreateObjects.emplace_back(StagedCreateObject(key, primaryAdminGroup, timeMicroseconds));
}
void Database::add(const Key &key, DataView data)
{
// TODO: Append fractions to get real microseconds time
u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
- stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds });
+ stagedAddObjects.emplace_back(StagedAddObject(key, data, timeMicroseconds));
}
void Database::commit()
@@ -103,7 +121,9 @@ namespace odhtdb
// TODO: Use (ed25519 or poly1305) and curve25519
// TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
sibs::SafeSerializer serializer;
+ assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN);
serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size());
+ serializer.add(stagedObject.timestamp);
serializer.add((u8)stagedObject.primaryAdminGroup->getName().size());
serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size());
serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size());
@@ -113,9 +133,9 @@ namespace odhtdb
serializer.add((u8*)user->getName().data(), user->getName().size());
}
- // TODO: Verify if serializer buffer needs to survive longer than this method
+ // TODO: Verify if serializer buffer needs to survive longer than this scope
Value value(serializer.getBuffer().data(), serializer.getBuffer().size());
- node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
+ node.put(CREATE_DATA_HASH, move(value), [](bool ok)
{
// TODO: Handle failure to put data
if(!ok)
@@ -127,8 +147,15 @@ namespace odhtdb
{
// TODO: Use (ed25519 or poly1305) and curve25519
// TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
- Value value((const u8*)stagedObject.data.data, stagedObject.data.size);
- node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
+ sibs::SafeSerializer serializer;
+ assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN);
+ serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size());
+ serializer.add(stagedObject.timestamp);
+ serializer.add((u8*)stagedObject.data.data, stagedObject.data.size);
+
+ // TODO: Verify if serializer buffer needs to survive longer than this scope
+ Value value(serializer.getBuffer().data(), serializer.getBuffer().size());
+ node.put(ADD_DATA_HASH, move(value), [](bool ok)
{
// TODO: Handle failure to put data
if(!ok)
@@ -144,4 +171,62 @@ namespace odhtdb
timestamp.fractions = 0; // TODO: Set this
return timestamp;
}
+
+ StagedCreateObject Database::deserializeCreateRequest(const std::shared_ptr<dht::Value> &value)
+ {
+ StagedCreateObject result;
+
+ sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ u8 entryKeyRaw[OPENDHT_INFOHASH_LEN];
+ deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.timestamp = deserializer.extract<u64>();
+
+ return result;
+ }
+
+ StagedAddObject Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value)
+ {
+ StagedAddObject result;
+
+ sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ u8 entryKeyRaw[OPENDHT_INFOHASH_LEN];
+ deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN);
+ result.timestamp = deserializer.extract<u64>();
+
+ return result;
+ }
+
+ bool Database::listenCreateData(const std::vector<std::shared_ptr<dht::Value>> &values)
+ {
+ for(const shared_ptr<Value> &value : values)
+ {
+ try
+ {
+ StagedCreateObject createObject = deserializeCreateRequest(value);
+ }
+ catch (sibs::DeserializeException &e)
+ {
+ fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what());
+ }
+ }
+ return true;
+ }
+
+ bool Database::listenAddData(const std::vector<std::shared_ptr<dht::Value>> &values)
+ {
+ for(const shared_ptr<Value> &value : values)
+ {
+ try
+ {
+ StagedAddObject addObject = deserializeAddRequest(value);
+ }
+ catch (sibs::DeserializeException &e)
+ {
+ fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what());
+ }
+ }
+ return true;
+ }
} \ No newline at end of file
diff --git a/tests/main.cpp b/tests/main.cpp
index 2618d18..5ddcbe9 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -10,9 +10,9 @@ int main()
// TODO: For tests, dont run against bootstrap.ring.cx.
// Run against a bootstrap node made only for testing which doesn't persist added data.
Database database("bootstrap.ring.cx", 4222);
+ database.seed();
LocalUser *localUser = LocalUser::create("dec05eba");
-
Group group("admin");
group.addUser(localUser);
database.create("galax.channel.latenight.chat", &group);