diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Database.cpp | 113 | ||||
-rw-r--r-- | src/Group.cpp | 24 | ||||
-rw-r--r-- | src/main.cpp | 51 |
3 files changed, 130 insertions, 58 deletions
diff --git a/src/Database.cpp b/src/Database.cpp index 4fa8405..33aef54 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -1,9 +1,22 @@ #include "../include/Database.hpp" +#include "../include/Group.hpp" +#include "../include/User.hpp" #include <opendht.h> #include <fmt/format.h> +#include <sodium/crypto_box_curve25519xchacha20poly1305.h> +#include <thread> +#include <chrono> +#include <sibs/SafeSerializer.hpp> using namespace dht; using namespace std; +using namespace chrono_literals; + +static int databaseCount = 0; +// TODO: Verify time_t is always signed +static time_t timeOffset = 0; // Updated by comparing local time with ntp server +static thread *ntpThread = nullptr; +static bool timestampSynced = false; namespace odhtdb { @@ -13,36 +26,122 @@ namespace odhtdb fmt::MemoryWriter portStr; portStr << port; node.bootstrap(bootstrapNodeAddr, portStr.c_str()); + + // TODO: Make this work for multiple threads initializing database at same time + ++databaseCount; + if(databaseCount == 1) + { + if(ntpThread) + delete ntpThread; + + ntpThread = new thread([]() + { + ntp::NtpClient ntpClient("pool.ntp.org"); + while(databaseCount > 0) + { + ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp(); + timeOffset = time(nullptr) - ntpTimestamp.seconds; + timestampSynced = true; + // TODO: Also use timestamp fraction (milliseconds) + this_thread::sleep_for(60s); + } + timestampSynced = false; + }); + + // TODO: Catch std::system_error instead of this if-statement + if(ntpThread->joinable()) + ntpThread->detach(); + } + + while(!timestampSynced) + { + this_thread::sleep_for(10ms); + } } Database::~Database() { + // TODO: Make this work for multiple threads removing database object at same time + --databaseCount; node.join(); } + void Database::create(const Key &key, Group *primaryAdminGroup) + { + // TODO: Append fractions to get real microseconds time + u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; + stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds }); + } + void Database::add(const Key &key, DataView data) { - stagedObjects.emplace_back(StagedObject{ key, data }); + // TODO: Append fractions to get real microseconds time + u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; + stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds }); } void Database::commit() { - // TODO: Combine staged objects into one object - for(StagedObject stagedObject : stagedObjects) + // TODO: Combine staged objects into one object for efficiency. + // TODO: Add rollback + + for(StagedCreateObject &stagedObject : stagedCreateObjects) { - commitStagedObject(stagedObject); + commitStagedCreateObject(stagedObject); } - stagedObjects.clear(); + stagedCreateObjects.clear(); + + for(StagedAddObject &stagedObject : stagedAddObjects) + { + commitStagedAddObject(stagedObject); + } + stagedAddObjects.clear(); + } + + void Database::commitStagedCreateObject(const StagedCreateObject &stagedObject) + { + // TODO: Use (ed25519 or poly1305) and curve25519 + // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) + sibs::SafeSerializer serializer; + serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); + serializer.add((u8)stagedObject.primaryAdminGroup->getName().size()); + serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size()); + serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size()); + for(User *user : stagedObject.primaryAdminGroup->getUsers()) + { + serializer.add((u8)user->getName().size()); + serializer.add((u8*)user->getName().data(), user->getName().size()); + } + + // TODO: Verify if serializer buffer needs to survive longer than this method + Value value(serializer.getBuffer().data(), serializer.getBuffer().size()); + node.put(stagedObject.key.hashedKey, move(value), [](bool ok) + { + // TODO: Handle failure to put data + if(!ok) + fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); + }, time_point(), false); } - void Database::commitStagedObject(const StagedObject &stagedObject) + void Database::commitStagedAddObject(const StagedAddObject &stagedObject) { + // TODO: Use (ed25519 or poly1305) and curve25519 + // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) Value value((const u8*)stagedObject.data.data, stagedObject.data.size); node.put(stagedObject.key.hashedKey, move(value), [](bool ok) { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedObject"); + fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject"); }, time_point(), false); } + + ntp::NtpTimestamp Database::getSyncedTimestampUtc() const + { + assert(timestampSynced); + ntp::NtpTimestamp timestamp; + timestamp.seconds = time(nullptr) - timeOffset; + timestamp.fractions = 0; // TODO: Set this + return timestamp; + } }
\ No newline at end of file diff --git a/src/Group.cpp b/src/Group.cpp index 213a0bb..c87be1e 100644 --- a/src/Group.cpp +++ b/src/Group.cpp @@ -1,8 +1,17 @@ #include "../include/Group.hpp" #include "../include/User.hpp" +using namespace std; + namespace odhtdb { + Group::Group(const string &_name) : + name(_name) + { + if(name.size() > 255) + throw GroupNameTooLongException(name); + } + Group::~Group() { for(User *user : users) @@ -10,4 +19,19 @@ namespace odhtdb delete user; } } + + void Group::addUser(User *user) + { + users.push_back(user); + } + + const string& Group::getName() const + { + return name; + } + + const vector<User*>& Group::getUsers() const + { + return users; + } }
\ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp deleted file mode 100644 index 8e38e18..0000000 --- a/src/main.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include <opendht.h> -#include <vector> -#include "../include/Database.hpp" - -using namespace odhtdb; - -int main() -{ - Database database("bootstrap.ring.cx", 4222); - Key channelChatKey("galax.channel.CAGERIJF232dKADS528392fawdkf3fas.chat"); - const char *data = "hello, world!"; - database.add(channelChatKey, DataView{ (void*)data, strlen(data) }); - database.commit(); - - /* - Database database("bootstrap.ring.cx", 4222); - Key channelChatKey("galax.channel.CAGERIJF232dKADS528392fawdkf3fas.chat"); - database.add(channelChatKey, { localUser, date, "hello, world!" }); - */ - -#if 0 - dht::DhtRunner node; - - // Launch a dht node on a new thread, using a - // generated RSA key pair, and listen on port 4222. - node.run(4222, dht::crypto::generateIdentity(), true); - - // Join the network through any running node, - // here using a known bootstrap node. - node.bootstrap("bootstrap.ring.cx", "4222"); - - // put some data on the dht - std::vector<uint8_t> some_data(5, 10); - node.put("unique_key", some_data); - - // put some data on the dht, signed with our generated private key - node.putSigned("unique_key_42", some_data); - - // get data from the dht - node.get("other_unique_key", [](const std::vector<std::shared_ptr<dht::Value>>& values) { - // Callback called when values are found - for (const auto& value : values) - std::cout << "Found value: " << *value << std::endl; - return true; // return false to stop the search - }); - - // wait for dht threads to end - node.join(); - #endif - return 0; -}
\ No newline at end of file |