aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksi Lindeman <aleksi_888@hotmail.com>2018-02-01 21:15:13 +0100
committerAleksi Lindeman <aleksi_888@hotmail.com>2018-02-01 21:15:19 +0100
commit6f1089db78f14b52b869f5aaa979e52ff5e4c2d7 (patch)
treed79f7abfb0fb9f3d58f9716741512b63fb4193ec /src
parent80a9c135b8bdca64246f147f22d98485e0f05ee5 (diff)
Sync time with ntp server, starting with basic operations
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp113
-rw-r--r--src/Group.cpp24
-rw-r--r--src/main.cpp51
3 files changed, 130 insertions, 58 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 4fa8405..33aef54 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -1,9 +1,22 @@
#include "../include/Database.hpp"
+#include "../include/Group.hpp"
+#include "../include/User.hpp"
#include <opendht.h>
#include <fmt/format.h>
+#include <sodium/crypto_box_curve25519xchacha20poly1305.h>
+#include <thread>
+#include <chrono>
+#include <sibs/SafeSerializer.hpp>
using namespace dht;
using namespace std;
+using namespace chrono_literals;
+
+static int databaseCount = 0;
+// TODO: Verify time_t is always signed
+static time_t timeOffset = 0; // Updated by comparing local time with ntp server
+static thread *ntpThread = nullptr;
+static bool timestampSynced = false;
namespace odhtdb
{
@@ -13,36 +26,122 @@ namespace odhtdb
fmt::MemoryWriter portStr;
portStr << port;
node.bootstrap(bootstrapNodeAddr, portStr.c_str());
+
+ // TODO: Make this work for multiple threads initializing database at same time
+ ++databaseCount;
+ if(databaseCount == 1)
+ {
+ if(ntpThread)
+ delete ntpThread;
+
+ ntpThread = new thread([]()
+ {
+ ntp::NtpClient ntpClient("pool.ntp.org");
+ while(databaseCount > 0)
+ {
+ ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp();
+ timeOffset = time(nullptr) - ntpTimestamp.seconds;
+ timestampSynced = true;
+ // TODO: Also use timestamp fraction (milliseconds)
+ this_thread::sleep_for(60s);
+ }
+ timestampSynced = false;
+ });
+
+ // TODO: Catch std::system_error instead of this if-statement
+ if(ntpThread->joinable())
+ ntpThread->detach();
+ }
+
+ while(!timestampSynced)
+ {
+ this_thread::sleep_for(10ms);
+ }
}
Database::~Database()
{
+ // TODO: Make this work for multiple threads removing database object at same time
+ --databaseCount;
node.join();
}
+ void Database::create(const Key &key, Group *primaryAdminGroup)
+ {
+ // TODO: Append fractions to get real microseconds time
+ u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
+ stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds });
+ }
+
void Database::add(const Key &key, DataView data)
{
- stagedObjects.emplace_back(StagedObject{ key, data });
+ // TODO: Append fractions to get real microseconds time
+ u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
+ stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds });
}
void Database::commit()
{
- // TODO: Combine staged objects into one object
- for(StagedObject stagedObject : stagedObjects)
+ // TODO: Combine staged objects into one object for efficiency.
+ // TODO: Add rollback
+
+ for(StagedCreateObject &stagedObject : stagedCreateObjects)
{
- commitStagedObject(stagedObject);
+ commitStagedCreateObject(stagedObject);
}
- stagedObjects.clear();
+ stagedCreateObjects.clear();
+
+ for(StagedAddObject &stagedObject : stagedAddObjects)
+ {
+ commitStagedAddObject(stagedObject);
+ }
+ stagedAddObjects.clear();
+ }
+
+ void Database::commitStagedCreateObject(const StagedCreateObject &stagedObject)
+ {
+ // TODO: Use (ed25519 or poly1305) and curve25519
+ // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
+ sibs::SafeSerializer serializer;
+ serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size());
+ serializer.add((u8)stagedObject.primaryAdminGroup->getName().size());
+ serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size());
+ serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size());
+ for(User *user : stagedObject.primaryAdminGroup->getUsers())
+ {
+ serializer.add((u8)user->getName().size());
+ serializer.add((u8*)user->getName().data(), user->getName().size());
+ }
+
+ // TODO: Verify if serializer buffer needs to survive longer than this method
+ Value value(serializer.getBuffer().data(), serializer.getBuffer().size());
+ node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
+ {
+ // TODO: Handle failure to put data
+ if(!ok)
+ fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject");
+ }, time_point(), false);
}
- void Database::commitStagedObject(const StagedObject &stagedObject)
+ void Database::commitStagedAddObject(const StagedAddObject &stagedObject)
{
+ // TODO: Use (ed25519 or poly1305) and curve25519
+ // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
Value value((const u8*)stagedObject.data.data, stagedObject.data.size);
node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
{
// TODO: Handle failure to put data
if(!ok)
- fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedObject");
+ fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject");
}, time_point(), false);
}
+
+ ntp::NtpTimestamp Database::getSyncedTimestampUtc() const
+ {
+ assert(timestampSynced);
+ ntp::NtpTimestamp timestamp;
+ timestamp.seconds = time(nullptr) - timeOffset;
+ timestamp.fractions = 0; // TODO: Set this
+ return timestamp;
+ }
} \ No newline at end of file
diff --git a/src/Group.cpp b/src/Group.cpp
index 213a0bb..c87be1e 100644
--- a/src/Group.cpp
+++ b/src/Group.cpp
@@ -1,8 +1,17 @@
#include "../include/Group.hpp"
#include "../include/User.hpp"
+using namespace std;
+
namespace odhtdb
{
+ Group::Group(const string &_name) :
+ name(_name)
+ {
+ if(name.size() > 255)
+ throw GroupNameTooLongException(name);
+ }
+
Group::~Group()
{
for(User *user : users)
@@ -10,4 +19,19 @@ namespace odhtdb
delete user;
}
}
+
+ void Group::addUser(User *user)
+ {
+ users.push_back(user);
+ }
+
+ const string& Group::getName() const
+ {
+ return name;
+ }
+
+ const vector<User*>& Group::getUsers() const
+ {
+ return users;
+ }
} \ No newline at end of file
diff --git a/src/main.cpp b/src/main.cpp
deleted file mode 100644
index 8e38e18..0000000
--- a/src/main.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-#include <opendht.h>
-#include <vector>
-#include "../include/Database.hpp"
-
-using namespace odhtdb;
-
-int main()
-{
- Database database("bootstrap.ring.cx", 4222);
- Key channelChatKey("galax.channel.CAGERIJF232dKADS528392fawdkf3fas.chat");
- const char *data = "hello, world!";
- database.add(channelChatKey, DataView{ (void*)data, strlen(data) });
- database.commit();
-
- /*
- Database database("bootstrap.ring.cx", 4222);
- Key channelChatKey("galax.channel.CAGERIJF232dKADS528392fawdkf3fas.chat");
- database.add(channelChatKey, { localUser, date, "hello, world!" });
- */
-
-#if 0
- dht::DhtRunner node;
-
- // Launch a dht node on a new thread, using a
- // generated RSA key pair, and listen on port 4222.
- node.run(4222, dht::crypto::generateIdentity(), true);
-
- // Join the network through any running node,
- // here using a known bootstrap node.
- node.bootstrap("bootstrap.ring.cx", "4222");
-
- // put some data on the dht
- std::vector<uint8_t> some_data(5, 10);
- node.put("unique_key", some_data);
-
- // put some data on the dht, signed with our generated private key
- node.putSigned("unique_key_42", some_data);
-
- // get data from the dht
- node.get("other_unique_key", [](const std::vector<std::shared_ptr<dht::Value>>& values) {
- // Callback called when values are found
- for (const auto& value : values)
- std::cout << "Found value: " << *value << std::endl;
- return true; // return false to stop the search
- });
-
- // wait for dht threads to end
- node.join();
- #endif
- return 0;
-} \ No newline at end of file