#include "../include/Database.hpp" #include "../include/Group.hpp" #include "../include/LocalUser.hpp" #include "../include/RemoteUser.hpp" #include #include #include #include #include #include #include #include using namespace dht; using namespace std; using namespace chrono_literals; static int databaseCount = 0; // TODO: Verify time_t is always signed static time_t timeOffset = 0; // Updated by comparing local time with ntp server static thread *ntpThread = nullptr; static bool timestampSynced = false; static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data"); static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data"); const int OPENDHT_INFOHASH_LEN = 20; namespace odhtdb { Database::Database(const char *bootstrapNodeAddr, u16 port, boost::filesystem::path storageDir) { node.run(port, dht::crypto::generateIdentity(), true); fmt::MemoryWriter portStr; portStr << port; node.bootstrap(bootstrapNodeAddr, portStr.c_str()); // TODO: Make this work for multiple threads initializing database at same time ++databaseCount; if(databaseCount == 1) { if(ntpThread) delete ntpThread; ntpThread = new thread([]() { ntp::NtpClient ntpClient("pool.ntp.org"); while(databaseCount > 0) { ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp(); timeOffset = time(nullptr) - ntpTimestamp.seconds; timestampSynced = true; // TODO: Also use timestamp fraction (milliseconds) this_thread::sleep_for(60s); } timestampSynced = false; }); // TODO: Catch std::system_error instead of this if-statement if(ntpThread->joinable()) ntpThread->detach(); } while(!timestampSynced) { this_thread::sleep_for(10ms); } } Database::~Database() { // TODO: Make this work for multiple threads removing database object at same time --databaseCount; node.join(); } void Database::seed() { // TODO: Use cached files and seed those. If none exists, request new files to seed. // If nobody requests my cached files in a long time, request new files to seed and remove cached files // (only if there are plenty of other seeders for the cached files. This could also cause race issue // where all nodes with a cached file delete it at same time) using std::placeholders::_1; node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1)); } void Database::create(const Key &key, Group *primaryAdminGroup) { // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; stagedCreateObjects.emplace_back(StagedCreateObject(key, primaryAdminGroup, timeMicroseconds)); } void Database::add(const Key &key, DataView data) { // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; stagedAddObjects.emplace_back(StagedAddObject(key, data, timeMicroseconds)); } void Database::commit() { // TODO: Combine staged objects into one object for efficiency. // TODO: Add rollback printf("Num objects to create: %d\n", stagedCreateObjects.size()); for(StagedCreateObject &stagedObject : stagedCreateObjects) { commitStagedCreateObject(stagedObject); } stagedCreateObjects.clear(); printf("Num objects to add: %d\n", stagedAddObjects.size()); for(StagedAddObject &stagedObject : stagedAddObjects) { commitStagedAddObject(stagedObject); } stagedAddObjects.clear(); // TODO: Add node.listen here to get notified when remote peers got the commit, then we can say we can return } void Database::commitStagedCreateObject(const StagedCreateObject &stagedObject) { // TODO: Use (ed25519 or poly1305) and curve25519 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) sibs::SafeSerializer serializer; assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); serializer.add(stagedObject.timestamp); serializer.add((u8)stagedObject.primaryAdminGroup->getName().size()); serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size()); assert(stagedObject.primaryAdminGroup->getUsers().size() <= 255); serializer.add((u8)stagedObject.primaryAdminGroup->getUsers().size()); for(User *user : stagedObject.primaryAdminGroup->getUsers()) { serializer.add((u8*)user->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); serializer.add((u8)user->getName().size()); serializer.add((u8*)user->getName().data(), user->getName().size()); } // TODO: Verify if serializer buffer needs to survive longer than this scope Value createDataValue(serializer.getBuffer().data(), serializer.getBuffer().size()); node.put(CREATE_DATA_HASH, move(createDataValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); }/* TODO: How to make this work?, time_point(), false*/); // Post data for listeners of this key Value putKeyValue(serializer.getBuffer().data() + OPENDHT_INFOHASH_LEN, serializer.getBuffer().size() - OPENDHT_INFOHASH_LEN); node.put(stagedObject.key.hashedKey, move(putKeyValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put for listeners: %s, what to do?\n", "commitStagedCreateObject"); }); } void Database::commitStagedAddObject(const StagedAddObject &stagedObject) { // TODO: Use (ed25519 or poly1305) and curve25519 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) sibs::SafeSerializer serializer; assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size()); serializer.add(stagedObject.timestamp); serializer.add((u8*)stagedObject.data.data, stagedObject.data.size); // TODO: Verify if serializer buffer needs to survive longer than this scope Value addDataValue(serializer.getBuffer().data(), serializer.getBuffer().size()); node.put(ADD_DATA_HASH, move(addDataValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put for all: %s, what to do?\n", "commitStagedAddObject"); }, time_point(), false); // Post data for listeners of this key Value putKeyValue(serializer.getBuffer().data() + OPENDHT_INFOHASH_LEN, serializer.getBuffer().size() - OPENDHT_INFOHASH_LEN); node.put(stagedObject.key.hashedKey, move(putKeyValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put for listeners: %s, what to do?\n", "commitStagedAddObject"); }, time_point(), false); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() const { assert(timestampSynced); ntp::NtpTimestamp timestamp; timestamp.seconds = time(nullptr) - timeOffset; timestamp.fractions = 0; // TODO: Set this return timestamp; } StagedCreateObject Database::deserializeCreateRequest(const std::shared_ptr &value) { StagedCreateObject result; sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); result.timestamp = deserializer.extract(); u8 adminGroupNameSize = deserializer.extract(); string adminGroupName; adminGroupName.resize(adminGroupNameSize); deserializer.extract((u8*)&adminGroupName[0], adminGroupNameSize); result.primaryAdminGroup = new Group(adminGroupName); u8 numUsers = deserializer.extract(); for(int i = 0; i < numUsers; ++i) { char userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); u8 userNameSize = deserializer.extract(); string userName; userName.resize(userNameSize); deserializer.extract((u8*)&userName[0], userNameSize); RemoteUser *user = RemoteUser::create(userPublicKey, userName); result.primaryAdminGroup->addUser(user); } // NOTE: There might be more data in deserializer, but we can ignore those; we already got all data we need return result; } StagedAddObject Database::deserializeAddRequest(const std::shared_ptr &value) { StagedAddObject result; sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); result.timestamp = deserializer.extract(); return result; } bool Database::listenCreateData(std::shared_ptr value) { printf("Got create data\n"); try { // TODO: Verify createObject timestamp is not in the future StagedCreateObject createObject = deserializeCreateRequest(value); delete createObject.primaryAdminGroup; } catch (sibs::DeserializeException &e) { fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); } return true; } bool Database::listenAddData(std::shared_ptr value) { printf("Got add data\n"); try { StagedAddObject addObject = deserializeAddRequest(value); } catch (sibs::DeserializeException &e) { fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); } return true; } }