diff options
author | Aleksi Lindeman <aleksi_888@hotmail.com> | 2018-02-01 21:15:13 +0100 |
---|---|---|
committer | Aleksi Lindeman <aleksi_888@hotmail.com> | 2018-02-01 21:15:19 +0100 |
commit | 6f1089db78f14b52b869f5aaa979e52ff5e4c2d7 (patch) | |
tree | d79f7abfb0fb9f3d58f9716741512b63fb4193ec | |
parent | 80a9c135b8bdca64246f147f22d98485e0f05ee5 (diff) |
Sync time with ntp server, starting with basic operations
-rw-r--r-- | .vscode/c_cpp_properties.json | 4 | ||||
-rw-r--r-- | README.md | 12 | ||||
-rw-r--r-- | include/Database.hpp | 21 | ||||
-rw-r--r-- | include/Group.hpp | 19 | ||||
-rw-r--r-- | include/LocalUser.hpp | 17 | ||||
-rw-r--r-- | include/User.hpp | 21 | ||||
-rw-r--r-- | project.conf | 6 | ||||
-rw-r--r-- | src/Database.cpp | 113 | ||||
-rw-r--r-- | src/Group.cpp | 24 | ||||
-rw-r--r-- | src/main.cpp | 51 | ||||
-rw-r--r-- | tests/main.cpp | 24 |
11 files changed, 246 insertions, 66 deletions
diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json index 6c5092f..aff2d2c 100644 --- a/.vscode/c_cpp_properties.json +++ b/.vscode/c_cpp_properties.json @@ -34,6 +34,8 @@ "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/include-fixed", "/usr/include", "~/.sibs/lib/fmt/4.1.0", + "~/.sibs/lib/ntpclient/0.1.0/include", + "~/.sibs/lib/sibs-serializer/0.1.0/", "${workspaceRoot}" ], "defines": [], @@ -48,6 +50,8 @@ "/usr/lib/gcc/x86_64-pc-linux-gnu/7.2.1/include-fixed", "/usr/include", "~/.sibs/lib/fmt/4.1.0", + "~/.sibs/lib/ntpclient/0.1.0/include", + "~/.sibs/lib/sibs-serializer/0.1.0/", "${workspaceRoot}" ], "limitSymbolsToIncludedHeaders": true, @@ -1,2 +1,14 @@ # odhtdb OpenDHT Database + +# Limits +Only 65kb of data can be used for each `add`. You can add more data by using `add` several times. +User and group name can't be longer than 255 characters. + +# TODO +## Node banning +Ban nodes that spam put or get (malicious nodes). If data is routed, then the router node should first ban the malicious node so the router node is not banned if it's not malicious. +But how can we know if data was routed? does opendht expose this to other nodes in some way? +## Error handling +Currently operations are executed without knowing if they succeed or not. Operations should be modified to perhaps return std::future or allow +used to pass a callback function which is called with the operation result.
\ No newline at end of file diff --git a/include/Database.hpp b/include/Database.hpp index 0b324e8..20d7513 100644 --- a/include/Database.hpp +++ b/include/Database.hpp @@ -5,13 +5,24 @@ #include "DataView.hpp" #include <opendht/dhtrunner.h> #include <vector> +#include <ntp/NtpClient.hpp> namespace odhtdb { - struct StagedObject + class Group; + + struct StagedCreateObject + { + Key key; + Group *primaryAdminGroup; + u64 timestamp; // In microseconds + }; + + struct StagedAddObject { Key key; DataView data; + u64 timestamp; // In microseconds }; class Database @@ -20,12 +31,16 @@ namespace odhtdb Database(const char *bootstrapNodeAddr, u16 port); ~Database(); + void create(const Key &key, Group *primaryAdminGroup); void add(const Key &key, DataView data); void commit(); private: - void commitStagedObject(const StagedObject &stagedObject); + void commitStagedCreateObject(const StagedCreateObject &stagedObject); + void commitStagedAddObject(const StagedAddObject &stagedObject); + ntp::NtpTimestamp getSyncedTimestampUtc() const; private: dht::DhtRunner node; - std::vector<StagedObject> stagedObjects; + std::vector<StagedCreateObject> stagedCreateObjects; + std::vector<StagedAddObject> stagedAddObjects; }; }
\ No newline at end of file diff --git a/include/Group.hpp b/include/Group.hpp index dafc05a..c909728 100644 --- a/include/Group.hpp +++ b/include/Group.hpp @@ -1,19 +1,34 @@ #pragma once -#include <opendht/crypto.h> #include <string> #include <vector> +#include <stdexcept> namespace odhtdb { class User; + class GroupNameTooLongException : public std::runtime_error + { + public: + GroupNameTooLongException(const std::string &groupName) : + std::runtime_error(std::string("The group name ") + groupName + " is longer than 255 bytes") + { + + } + }; + class Group { public: + Group(const std::string &name); ~Group(); + + void addUser(User *user); + + const std::string& getName() const; + const std::vector<User*>& getUsers() const; private: - dht::crypto::PublicKey publicKey; std::string name; std::vector<User*> users; }; diff --git a/include/LocalUser.hpp b/include/LocalUser.hpp new file mode 100644 index 0000000..200f30f --- /dev/null +++ b/include/LocalUser.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "User.hpp" + +namespace odhtdb +{ + class LocalUser : public User + { + public: + static LocalUser* create(const std::string &name) + { + return new LocalUser(name); + } + private: + LocalUser(const std::string &name) : User(name){} + }; +}
\ No newline at end of file diff --git a/include/User.hpp b/include/User.hpp index 727bbcf..e542434 100644 --- a/include/User.hpp +++ b/include/User.hpp @@ -1,15 +1,32 @@ #pragma once -#include <opendht/crypto.h> #include <string> +#include <stdexcept> namespace odhtdb { + class UserNameTooLongException : public std::runtime_error + { + public: + UserNameTooLongException(const std::string &userName) : + std::runtime_error(std::string("The username ") + userName + " is longer than 255 bytes") + { + + } + }; + class User { public: + const std::string& getName() const { return name; } + protected: + User(const std::string &_name) : name(_name) + { + if(name.size() > 255) + throw UserNameTooLongException(name); + } private: - dht::crypto::PublicKey publicKey; + // TODO: Add public key std::string name; }; }
\ No newline at end of file diff --git a/project.conf b/project.conf index 6c261d6..8348027 100644 --- a/project.conf +++ b/project.conf @@ -1,9 +1,13 @@ [package] name = "odhtdb" version = "0.1.0" -type = "executable" +type = "library" platforms = ["linux32", "linux64"] +tests = "tests" [dependencies] opendht = "1.5.0" fmt = "4.1.0" +libsodium = "1.0.16" +ntpclient = "0.1.0" +sibs-serializer = "0.1.0"
\ No newline at end of file diff --git a/src/Database.cpp b/src/Database.cpp index 4fa8405..33aef54 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -1,9 +1,22 @@ #include "../include/Database.hpp" +#include "../include/Group.hpp" +#include "../include/User.hpp" #include <opendht.h> #include <fmt/format.h> +#include <sodium/crypto_box_curve25519xchacha20poly1305.h> +#include <thread> +#include <chrono> +#include <sibs/SafeSerializer.hpp> using namespace dht; using namespace std; +using namespace chrono_literals; + +static int databaseCount = 0; +// TODO: Verify time_t is always signed +static time_t timeOffset = 0; // Updated by comparing local time with ntp server +static thread *ntpThread = nullptr; +static bool timestampSynced = false; namespace odhtdb { @@ -13,36 +26,122 @@ namespace odhtdb fmt::MemoryWriter portStr; portStr << port; node.bootstrap(bootstrapNodeAddr, portStr.c_str()); + + // TODO: Make this work for multiple threads initializing database at same time + ++databaseCount; + if(databaseCount == 1) + { + if(ntpThread) + delete ntpThread; + + ntpThread = new thread([]() + { + ntp::NtpClient ntpClient("pool.ntp.org"); + while(databaseCount > 0) + { + ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp(); + timeOffset = time(nullptr) - ntpTimestamp.seconds; + timestampSynced = true; + // TODO: Also use timestamp fraction (milliseconds) + this_thread::sleep_for(60s); + } + timestampSynced = false; + }); + + // TODO: Catch std::system_error instead of this if-statement + if(ntpThread->joinable()) + ntpThread->detach(); + } + + while(!timestampSynced) + { + this_thread::sleep_for(10ms); + } } Database::~Database() { + // TODO: Make this work for multiple threads removing database object at same time + --databaseCount; node.join(); } + void Database::create(const Key &key, Group *primaryAdminGroup) + { + // TODO: Append fractions to get real microseconds time + u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; + stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds }); + } + void Database::add(const Key &key, DataView data) { - stagedObjects.emplace_back(StagedObject{ key, data }); + // TODO: Append fractions to get real microseconds time + u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; + stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds }); } void Database::commit() { - // TODO: Combine staged objects into one object - for(StagedObject stagedObject : stagedObjects) + // TODO: Combine staged objects into one object for efficiency. + // TODO: Add rollback + + for(StagedCreateObject &stagedObject : stagedCreateObjects) { - commitStagedObject(stagedObject); + commitStagedCreateObject(stagedObject); } - stagedObjects.clear(); + stagedCreateObjects.clear(); + + for(StagedAddObject &stagedObject : stagedAddObjects) + { + commitStagedAddObject(stagedObject); + } + stagedAddObjects.clear(); + } + + void Database::commitStagedCreateObject(const StagedCreateObject &stagedObject) + { + // TODO: Use (ed25519 or poly1305) and curve25519 + // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) + sibs::SafeSerializer serializer; + serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); + serializer.add((u8)stagedObject.primaryAdminGroup->getName().size()); + serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size()); + serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size()); + for(User *user : stagedObject.primaryAdminGroup->getUsers()) + { + serializer.add((u8)user->getName().size()); + serializer.add((u8*)user->getName().data(), user->getName().size()); + } + + // TODO: Verify if serializer buffer needs to survive longer than this method + Value value(serializer.getBuffer().data(), serializer.getBuffer().size()); + node.put(stagedObject.key.hashedKey, move(value), [](bool ok) + { + // TODO: Handle failure to put data + if(!ok) + fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); + }, time_point(), false); } - void Database::commitStagedObject(const StagedObject &stagedObject) + void Database::commitStagedAddObject(const StagedAddObject &stagedObject) { + // TODO: Use (ed25519 or poly1305) and curve25519 + // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) Value value((const u8*)stagedObject.data.data, stagedObject.data.size); node.put(stagedObject.key.hashedKey, move(value), [](bool ok) { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedObject"); + fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject"); }, time_point(), false); } + + ntp::NtpTimestamp Database::getSyncedTimestampUtc() const + { + assert(timestampSynced); + ntp::NtpTimestamp timestamp; + timestamp.seconds = time(nullptr) - timeOffset; + timestamp.fractions = 0; // TODO: Set this + return timestamp; + } }
\ No newline at end of file diff --git a/src/Group.cpp b/src/Group.cpp index 213a0bb..c87be1e 100644 --- a/src/Group.cpp +++ b/src/Group.cpp @@ -1,8 +1,17 @@ #include "../include/Group.hpp" #include "../include/User.hpp" +using namespace std; + namespace odhtdb { + Group::Group(const string &_name) : + name(_name) + { + if(name.size() > 255) + throw GroupNameTooLongException(name); + } + Group::~Group() { for(User *user : users) @@ -10,4 +19,19 @@ namespace odhtdb delete user; } } + + void Group::addUser(User *user) + { + users.push_back(user); + } + + const string& Group::getName() const + { + return name; + } + + const vector<User*>& Group::getUsers() const + { + return users; + } }
\ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp deleted file mode 100644 index 8e38e18..0000000 --- a/src/main.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include <opendht.h> -#include <vector> -#include "../include/Database.hpp" - -using namespace odhtdb; - -int main() -{ - Database database("bootstrap.ring.cx", 4222); - Key channelChatKey("galax.channel.CAGERIJF232dKADS528392fawdkf3fas.chat"); - const char *data = "hello, world!"; - database.add(channelChatKey, DataView{ (void*)data, strlen(data) }); - database.commit(); - - /* - Database database("bootstrap.ring.cx", 4222); - Key channelChatKey("galax.channel.CAGERIJF232dKADS528392fawdkf3fas.chat"); - database.add(channelChatKey, { localUser, date, "hello, world!" }); - */ - -#if 0 - dht::DhtRunner node; - - // Launch a dht node on a new thread, using a - // generated RSA key pair, and listen on port 4222. - node.run(4222, dht::crypto::generateIdentity(), true); - - // Join the network through any running node, - // here using a known bootstrap node. - node.bootstrap("bootstrap.ring.cx", "4222"); - - // put some data on the dht - std::vector<uint8_t> some_data(5, 10); - node.put("unique_key", some_data); - - // put some data on the dht, signed with our generated private key - node.putSigned("unique_key_42", some_data); - - // get data from the dht - node.get("other_unique_key", [](const std::vector<std::shared_ptr<dht::Value>>& values) { - // Callback called when values are found - for (const auto& value : values) - std::cout << "Found value: " << *value << std::endl; - return true; // return false to stop the search - }); - - // wait for dht threads to end - node.join(); - #endif - return 0; -}
\ No newline at end of file diff --git a/tests/main.cpp b/tests/main.cpp new file mode 100644 index 0000000..2618d18 --- /dev/null +++ b/tests/main.cpp @@ -0,0 +1,24 @@ +#include <vector> +#include "../include/Database.hpp" +#include "../include/Group.hpp" +#include "../include/LocalUser.hpp" + +using namespace odhtdb; + +int main() +{ + // TODO: For tests, dont run against bootstrap.ring.cx. + // Run against a bootstrap node made only for testing which doesn't persist added data. + Database database("bootstrap.ring.cx", 4222); + + LocalUser *localUser = LocalUser::create("dec05eba"); + + Group group("admin"); + group.addUser(localUser); + database.create("galax.channel.latenight.chat", &group); + + const char *data = "hello, world!"; + database.add("galax.channel.latenight.chat", DataView{ (void*)data, strlen(data) }); + database.commit(); + return 0; +}
\ No newline at end of file |