#include "../include/Database.hpp" #include "../include/Group.hpp" #include "../include/LocalUser.hpp" #include "../include/RemoteUser.hpp" #include "../include/Encryption.hpp" #include "../include/DhtKey.hpp" #include "../include/bin2hex.hpp" #include #include #include #include #include #include #include #include using namespace dht; using namespace std; using namespace chrono_literals; static int databaseCount = 0; // TODO: Verify time_t is always signed static time_t timeOffset = 0; // Updated by comparing local time with ntp server static thread *ntpThread = nullptr; static bool timestampSynced = false; static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data"); static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data"); const int OPENDHT_INFOHASH_LEN = 20; namespace odhtdb { const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 0; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 0; DataView combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) { usize allocationSize = headerSerializer.getBuffer().size() + encryptedData.getNonce().size + encryptedData.getCipherText().size; char *result = new char[allocationSize]; memcpy(result, headerSerializer.getBuffer().data(), headerSerializer.getBuffer().size()); memcpy(result + headerSerializer.getBuffer().size(), encryptedData.getNonce().data, encryptedData.getNonce().size); memcpy(result + headerSerializer.getBuffer().size() + encryptedData.getNonce().size, encryptedData.getCipherText().data, encryptedData.getCipherText().size); return DataView(result, allocationSize); } DatabaseCreateResponse::DatabaseCreateResponse(const shared_ptr &_key, const shared_ptr &_hash) : key(move(_key)), hash(_hash) { } const shared_ptr DatabaseCreateResponse::getNodeEncryptionKey() const { return key; } const shared_ptr DatabaseCreateResponse::getRequestHash() const { return hash; } Database::Database(const char *bootstrapNodeAddr, u16 port, boost::filesystem::path storageDir) { // TODO: Cache this in storage. It takes pretty long time to generate new identity auto identity = dht::crypto::generateIdentity(); node.run(port , { /*.dht_config = */{ /*.node_config = */{ /*.node_id = */{}, /*.network = */0, /*.is_bootstrap = */false, /*.maintain_storage*/false }, /*.id = */identity }, /*.threaded = */true, /*.proxy_server = */"", /*.push_node_id = */"" }); fmt::MemoryWriter portStr; portStr << port; node.bootstrap(bootstrapNodeAddr, portStr.c_str()); // TODO: Make this work for multiple threads initializing database at same time ++databaseCount; if(databaseCount == 1) { if(ntpThread) delete ntpThread; ntpThread = new thread([]() { ntp::NtpClient ntpClient("pool.ntp.org"); while(databaseCount > 0) { ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp(); timeOffset = time(nullptr) - ntpTimestamp.seconds; timestampSynced = true; // TODO: Also use timestamp fraction (milliseconds) this_thread::sleep_for(60s); } timestampSynced = false; }); // TODO: Catch std::system_error instead of this if-statement if(ntpThread->joinable()) ntpThread->detach(); } while(!timestampSynced) { this_thread::sleep_for(10ms); } } Database::~Database() { // TODO: Make this work for multiple threads removing database object at same time --databaseCount; node.join(); } void Database::seed(const shared_ptr hash, const shared_ptr encryptionKey) { // TODO: Use cached files and seed those. If none exists, request new files to seed. // If nobody requests my cached files in a long time, request new files to seed and remove cached files // (only if there are plenty of other seeders for the cached files. This could also cause race issue // where all nodes with a cached file delete it at same time) printf("Seeding key: %s\n", hash->toString().c_str()); DhtKey dhtKey(*hash); node.listen(dhtKey.getNewDataListenerKey(), [this, hash, encryptionKey](const shared_ptr &value) { return listenAddData(value, *hash, encryptionKey); }); u8 responseKey[OPENDHT_INFOHASH_LEN]; randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN); // TODO: If this response key is spammed, generate a new one node.listen(InfoHash(responseKey, OPENDHT_INFOHASH_LEN), [this, hash, encryptionKey](const shared_ptr &value) { const Hash requestHash(value->data.data(), value->data.size()); if(requestHash == *hash) return listenCreateData(value, requestHash, encryptionKey); else return listenAddData(value, requestHash, encryptionKey); }); // TODO: Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. node.listen(dhtKey.getRequestOldDataKey(), [this, hash](const shared_ptr &value) { printf("Request: Got request to send old data\n"); try { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); u64 dataStartTimestamp = deserializer.extract(); u8 requestResponseKey[OPENDHT_INFOHASH_LEN]; deserializer.extract(requestResponseKey, OPENDHT_INFOHASH_LEN); auto requestedData = databaseStorage.getStorage(*hash); if(!requestedData) { fprintf(stderr, "Warning: No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); return true; } if(dataStartTimestamp == 0) { printf("Request: Sent create packet to requesting peer\n"); node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value(requestedData->createData, requestedData->createDataSize), [](bool ok) { if(!ok) fprintf(stderr, "Failed to put response for old data\n"); }); } else { assert(false); printf("TODO: Send 'add' packets to requesting remote peer\n"); } } catch (sibs::DeserializeException &e) { fprintf(stderr, "Warning: Failed to deserialize 'get old data' request: %s\n", e.what()); } return true; }); sibs::SafeSerializer serializer; serializer.add((u64)0); // Timestamp in microseconds, fetch data newer than this serializer.add(responseKey, OPENDHT_INFOHASH_LEN); node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok) { if(!ok) fprintf(stderr, "Failed to put request to get old data\n"); }); //node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); //node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1)); } unique_ptr Database::create(const LocalUser *owner, const std::string &name) { // Header sibs::SafeSerializer serializer; serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); // Packet structure version // TODO: Append fractions to get real microseconds time u64 timestampMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; serializer.add(timestampMicroseconds); serializer.add((u8*)owner->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); // Encrypted body sibs::SafeSerializer encryptedSerializer; assert(owner->getName().size() <= 255); encryptedSerializer.add((u8)owner->getName().size()); encryptedSerializer.add((u8*)owner->getName().data(), owner->getName().size()); assert(name.size() <= 255); encryptedSerializer.add((u8)name.size()); encryptedSerializer.add((u8*)name.data(), name.size()); try { Encryption encryptedBody(DataView(encryptedSerializer.getBuffer().data(), encryptedSerializer.getBuffer().size())); DataView requestData = combine(serializer, encryptedBody); shared_ptr hashRequestKey = make_shared(requestData.data, requestData.size); auto adminGroup = new Group("administrator"); adminGroup->addUser(owner); databaseStorage.createStorage(*hashRequestKey, adminGroup, timestampMicroseconds, (const u8*)requestData.data, requestData.size); stagedCreateObjects.emplace_back(make_unique(requestData, hashRequestKey)); assert(encryptedBody.getKey().size == KEY_BYTE_SIZE); char *key = new char[encryptedBody.getKey().size]; memcpy(key, encryptedBody.getKey().data, encryptedBody.getKey().size); return make_unique(make_shared(key), hashRequestKey); } catch (EncryptionException &e) { throw DatabaseCreateException("Failed to encrypt data for 'create' request"); } } void Database::add(const LocalUser *owner, const Key &key, DataView data) { #if 0 if(nodeEncryptionKeys.find(key) == nodeEncryptionKeys.end()) throw DatabaseAddException("Data for key needs to be created before data can be appended to it"); unique_ptr signedData = make_unique(owner->getPrivateKey().sign(data)); // TODO: Append fractions to get real microseconds time u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; stagedAddObjects.emplace_back(StagedAddObject(key, move(signedData), timeMicroseconds, owner->getPublicKey())); #endif } void Database::commit() { // TODO: Combine staged objects into one object for efficiency. // TODO: Add rollback try { printf("Num objects to create: %zu\n", stagedCreateObjects.size()); for(const auto &stagedObject : stagedCreateObjects) { commitStagedCreateObject(stagedObject); } } catch (exception &e) { fprintf(stderr, "Error: Failed to commit, reason: %s\n", e.what()); } for(const auto &stagedObject : stagedCreateObjects) { free(stagedObject->encryptedBody.data); } stagedCreateObjects.clear(); #if 0 printf("Num objects to add: %d\n", stagedAddObjects.size()); for(StagedAddObject &stagedObject : stagedAddObjects) { commitStagedAddObject(stagedObject); } stagedAddObjects.clear(); #endif // TODO: Add node.listen here to get notified when remote peers got the commit, then we can say we can return } void Database::commitStagedCreateObject(const unique_ptr &stagedObject) { DhtKey dhtKey(*stagedObject->requestKey); Value createDataValue((u8*)stagedObject->encryptedBody.data, stagedObject->encryptedBody.size); node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); }/* TODO: How to make this work?, time_point(), false*/); } void Database::commitStagedAddObject(const DataView &stagedObject) { #if 0 // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) sibs::SafeSerializer headerSerializer; assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); headerSerializer.add(stagedObject.key.hashedKey.data(), OPENDHT_INFOHASH_LEN); headerSerializer.add(stagedObject.timestamp); sibs::SafeSerializer bodySerializer; bodySerializer.add((u8*)stagedObject.creatorPublicKey.getData(), PUBLIC_KEY_NUM_BYTES); assert(stagedObject.data->size() < 0xFFFF - 120); bodySerializer.add((u16)stagedObject.data->size()); bodySerializer.add((u8*)stagedObject.data->data(), stagedObject.data->size()); EncryptedData encryptedData; if(encrypt(&encryptedData, (EncryptionKey*)nodeEncryptionKeys[stagedObject.key], bodySerializer.getBuffer().data(), bodySerializer.getBuffer().size()) < 0) throw CommitAddException("Failed to encrypt staged add object"); Blob serializedData; combine(&serializedData, headerSerializer, encryptedData); // TODO: Verify if serializer buffer needs to survive longer than this scope Value addDataValue(move(serializedData)); node.put(ADD_DATA_HASH, move(addDataValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put for all: %s, what to do?\n", "commitStagedAddObject"); }); // Post data for listeners of this key /* Value putKeyValue(serializer.getBuffer().data() + OPENDHT_INFOHASH_LEN, serializer.getBuffer().size() - OPENDHT_INFOHASH_LEN); node.put(stagedObject.key.hashedKey, move(putKeyValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) fprintf(stderr, "Failed to put for listeners: %s, what to do?\n", "commitStagedAddObject"); }); */ #endif } ntp::NtpTimestamp Database::getSyncedTimestampUtc() const { assert(timestampSynced); ntp::NtpTimestamp timestamp; timestamp.seconds = time(nullptr) - timeOffset; timestamp.fractions = 0; // TODO: Set this return timestamp; } DatabaseCreateRequest Database::deserializeCreateRequest(const std::shared_ptr &value, const Hash &hash, const shared_ptr encryptionKey) { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); u16 packetStructureVersion = deserializer.extract(); if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) { string errMsg = "Received 'create' request with packet structure version "; errMsg += to_string(packetStructureVersion); errMsg += ", but our packet structure version is "; errMsg += to_string(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); throw sibs::DeserializeException(errMsg); } u64 creationDate = deserializer.extract(); // TODO: Append fractions to get real microseconds time u64 timestampMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; if(creationDate > timestampMicroseconds) throw sibs::DeserializeException("Packet is from the future"); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey userPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); if(deserializer.getSize() < NONCE_BYTE_SIZE) throw sibs::DeserializeException("Unsigned encrypted body is too small (unable to extract nonce)"); auto adminGroup = new Group("administrator"); auto creatorUser = RemoteUser::create(userPublicKey, ""); // Username is encrypted, we dont know it... adminGroup->addUser(creatorUser); databaseStorage.createStorage(hash, adminGroup, creationDate, value->data.data(), value->data.size()); u8 nonce[NONCE_BYTE_SIZE]; deserializer.extract(nonce, NONCE_BYTE_SIZE); DataView dataToDecrypt((void*)deserializer.getBuffer(), deserializer.getSize()); Decryption decryptedBody(dataToDecrypt, DataView(nonce, NONCE_BYTE_SIZE), DataView(*encryptionKey, KEY_BYTE_SIZE)); sibs::SafeDeserializer bodyDeserializer((const u8*)decryptedBody.getDecryptedText().data, decryptedBody.getDecryptedText().size); u8 creatorNameLength = bodyDeserializer.extract(); string creatorName; creatorName.resize(creatorNameLength); bodyDeserializer.extract((u8*)&creatorName[0], creatorNameLength); u8 nameLength = bodyDeserializer.extract(); string name; name.resize(nameLength); bodyDeserializer.extract((u8*)&name[0], nameLength); return { creationDate, adminGroup, move(name) }; } DatabaseAddRequest Database::deserializeAddRequest(const std::shared_ptr &value, const Hash &hash, const shared_ptr encryptionKey) { /* StagedAddObject result; sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); result.timestamp = deserializer.extract(); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); u16 dataSize = deserializer.extract(); if(dataSize < SIGNED_HASH_SIZE) throw sibs::DeserializeException("Signed data is too small"); string signedData; signedData.resize(dataSize); deserializer.extract((u8*)&signedData[0], dataSize); result.data = make_unique(); result.data->resize(dataSize); result.data = make_unique(creatorPublicKey.unsign(DataView((void*)signedData.data(), signedData.size()))); return result; */ Signature::PublicKey publicKey(nullptr, 0); DataView d; return { 0, 0, move(publicKey), d }; } bool Database::listenCreateData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) { printf("Got create data\n"); try { if(databaseStorage.getStorage(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); DatabaseCreateRequest createObject = deserializeCreateRequest(value, hash, encryptionKey); printf("Got create object, name: %s\n", createObject.name.c_str()); } catch (exception &e) { fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); } return true; } bool Database::listenAddData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) { printf("Got add data\n"); try { if(databaseStorage.getStorage(hash)) throw DatabaseStorageAlreadyExists("Add request hash is equal to hash already in storage (duplicate data?)"); // TODO: Verify createObject timestamp is not in the future //StagedAddObject addObject = deserializeAddRequest(value); //DataView data((void*)addObject.data->data(), addObject.data->size()); } catch (exception &e) { fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); } return true; } }