#include "../include/odhtdb/Database.hpp" #include "../include/odhtdb/Group.hpp" #include "../include/odhtdb/Encryption.hpp" #include "../include/odhtdb/DhtKey.hpp" #include "../include/odhtdb/bin2hex.hpp" #include "../include/odhtdb/Log.hpp" #include #include #include #include #include #include #include #include using namespace std; using namespace chrono_literals; static int databaseCount = 0; // TODO: Verify time_t is always signed static time_t timeOffset = 0; // Updated by comparing local time with ntp server static odhtdb::u64 timeOffsetFraction = 0; static thread *ntpThread = nullptr; static bool timestampSynced = false; namespace odhtdb { static boost::uuids::random_generator uuidGen; const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; class RequestQuarantineException : public runtime_error { public: RequestQuarantineException() : runtime_error("Request quarantine, will be processed later (can be real of fake request)") {} }; OwnedByteArray combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) { usize allocationSize = headerSerializer.getBuffer().size() + encryptedData.getNonce().size + encryptedData.getCipherText().size; u8 *result = new u8[allocationSize]; memcpy(result, headerSerializer.getBuffer().data(), headerSerializer.getBuffer().size()); memcpy(result + headerSerializer.getBuffer().size(), encryptedData.getNonce().data, encryptedData.getNonce().size); memcpy(result + headerSerializer.getBuffer().size() + encryptedData.getNonce().size, encryptedData.getCipherText().data, encryptedData.getCipherText().size); return OwnedByteArray(result, allocationSize); } OwnedByteArray combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData) { usize allocationSize = publicKey.getSize() + signedEncryptedData.size(); u8 *result = new u8[allocationSize]; memcpy(result, publicKey.getData(), publicKey.getSize()); memcpy(result + publicKey.getSize(), signedEncryptedData.data(), signedEncryptedData.size()); return OwnedByteArray(result, allocationSize); } DatabaseCreateResponse::DatabaseCreateResponse(std::shared_ptr _nodeAdminKeyPair, std::shared_ptr _nodeAdminGroupId, shared_ptr _key, shared_ptr _hash) : nodeAdminKeyPair(_nodeAdminKeyPair), nodeAdminGroupId(_nodeAdminGroupId), key(_key), hash(_hash) { } const shared_ptr DatabaseCreateResponse::getNodeAdminKeyPair() const { return nodeAdminKeyPair; } const shared_ptr DatabaseCreateResponse::getNodeAdminGroupId() const { return nodeAdminGroupId; } const shared_ptr DatabaseCreateResponse::getNodeEncryptionKey() const { return key; } const shared_ptr DatabaseCreateResponse::getRequestHash() const { return hash; } Database::Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : bootstrapConnection(sibs::BootstrapConnection::connect(bootstrapNodeAddr).get()), databaseStorage(this, storageDir), onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc), onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc), onAddUserCallbackFunc(callbackFuncs.addUserCallbackFunc), shuttingDown(false) { // TODO: Make this work for multiple threads initializing database at same time ++databaseCount; if(databaseCount == 1) { if(ntpThread) delete ntpThread; const int ntpFetchTimestampRetries = 5; const int ntpFetchFailRetryCooldownSec = 60; ntpThread = new thread([ntpFetchFailRetryCooldownSec]() { ntp::NtpClient ntpClient("pool.ntp.org", 10); while(databaseCount > 0) { int fetchRetryCounter = 0; while(fetchRetryCounter < ntpFetchTimestampRetries) { try { ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp(); struct timeval currentLocalTime; gettimeofday(¤tLocalTime, NULL); timeOffset = currentLocalTime.tv_sec - ntpTimestamp.seconds; timeOffsetFraction = currentLocalTime.tv_usec - ntpTimestamp.fractions; timestampSynced = true; break; } catch(ntp::NtpClientException &e) { Log::warn("Failed to sync clock with ntp server, reason: %s. Try #%d", e.what(), fetchRetryCounter); if(timestampSynced) this_thread::sleep_for(3s); } ++fetchRetryCounter; } if(fetchRetryCounter == ntpFetchTimestampRetries) { if(!timestampSynced) { string errMsg = "Failed to retrieve ntp timestamp after "; errMsg += to_string(ntpFetchTimestampRetries); errMsg += " retries, giving up"; throw ntp::NtpClientException(errMsg); } else { Log::warn("Failed to retrieve ntp timestamp after %d retries, retrying in %d seconds", ntpFetchTimestampRetries, ntpFetchFailRetryCooldownSec); this_thread::sleep_for(chrono::seconds(ntpFetchFailRetryCooldownSec)); } } this_thread::sleep_for(60s); } }); ntpThread->detach(); } remoteNodesSaveThread = thread([this]() { int saveIntervalMs = 5000; // 5 sec const int sleepDurationMs = 200; while(!shuttingDown) { for(int i = 0; i < saveIntervalMs / sleepDurationMs; ++i) { this_thread::sleep_for(chrono::milliseconds(sleepDurationMs)); if(shuttingDown) return; } databaseStorage.setRemotePeers(bootstrapConnection->getPeers()); saveIntervalMs = 30000; // 30 sec } }); } Database::~Database() { // TODO: Make this work for multiple threads removing database object at same time --databaseCount; shuttingDown = true; remoteNodesSaveThread.join(); } std::future> Database::connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) { std::promise> connectionPromise; std::future> connectionFuture = connectionPromise.get_future(); std::thread([](std::promise> connectionPromise, sibs::Ipv4 bootstrapNodeAddr, const boost::filesystem::path storageDir, DatabaseCallbackFuncs callbackFuncs) { try { Database *database = new Database(bootstrapNodeAddr, storageDir, callbackFuncs); connectionPromise.set_value(std::unique_ptr(database)); } catch(...) { connectionPromise.set_exception(std::current_exception()); } }, std::move(connectionPromise), sibs::Ipv4(bootstrapNodeAddr, port), storageDir, callbackFuncs).detach(); return connectionFuture; } struct ActionGap { u64 start; u64 range; }; bool Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size) { Log::debug("Request: Got request to send old data"); sibs::SafeDeserializer deserializer((const u8*)data, size); u16 requestStructureVersion = deserializer.extract(); if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) { Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); return true; } shared_ptr requestResponseInfoHash = make_shared(); deserializer.extract(&(*requestResponseInfoHash)[0], sibs::PUBSUB_KEY_LENGTH); bool userWantsCreateNode = deserializer.extract() == 1; DatabaseFetchOrder fetchOrder = deserializer.extract(); if(userWantsCreateNode) { Log::debug("Request: Peer wants CreateNode"); databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { Log::debug("Request: Sent create packet to requesting peer"); bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); }); } vector> userPublicKeys; // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants DataViewMap> actionGaps; while(!deserializer.empty()) { unique_ptr userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); u64 actionGapStart = deserializer.extract(); u64 actionGapRange = deserializer.extract(); DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); userPublicKeys.emplace_back(move(userPublicKeyRaw)); } if(actionGaps.empty()) Log::debug("No action gaps received, sending all data"); // TODO(Performance improvement): Instead of sending several packets, combine them into one databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) { bool sendData = false; auto actionGapsIt = actionGaps.find(creatorPublicKey); if(actionGapsIt == actionGaps.end()) { Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); sendData = true; } else { for(const auto &userActionGaps : actionGapsIt->second) { if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) { Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); sendData = true; break; } } } if(!sendData) return; bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); this_thread::sleep_for(chrono::milliseconds(50)); }, fetchOrder); return true; } void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder) { if(seedInfoMap.find(*nodeToSeed.getRequestHash()) != seedInfoMap.end()) { Log::warn("You are already seeding node %s, ignoring...", nodeToSeed.getRequestHash()->toString().c_str()); return; } DatabaseSeedInfo newSeedInfo; // TODO: Use cached files and seed those. If none exists, request new files to seed. // If nobody requests my cached files in a long time, request new files to seed and remove cached files // (only if there are plenty of other seeders for the cached files. This could also cause race issue // where all nodes with a cached file delete it at same time). databaseStorage.setNodeDecryptionKey(*nodeToSeed.getRequestHash(), DataView(nodeToSeed.getNodeEncryptionKey()->data, nodeToSeed.getNodeEncryptionKey()->size)); Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); newSeedInfo.newDataListenHandle = bootstrapConnection->listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; Log::debug("Seed: New data listener received data..."); const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) return true; else return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); u8 responseKey[sibs::PUBSUB_KEY_LENGTH]; randombytes_buf(responseKey, sibs::PUBSUB_KEY_LENGTH); shared_ptr responseKeyShared = make_shared(responseKey, sibs::PUBSUB_KEY_LENGTH); newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. newSeedInfo.responseKeyListenHandle = bootstrapConnection->listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) return listenCreateData(data, size, requestHash, nodeToSeed.getNodeEncryptionKey()); else return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. newSeedInfo.requestOldDataListenHandle = bootstrapConnection->listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; try { return sendOldDataToPeer(nodeToSeed, data, size); } catch (std::exception &e) { Log::warn("Failed while serving peer, error: %s", e.what()); return true; } }); seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo; sibs::SafeSerializer serializer; serializer.add(DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); serializer.add(responseKey, sibs::PUBSUB_KEY_LENGTH); bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash()); serializer.add(iHaveCreateNode ? (u8)0 : (u8)1); serializer.add(fetchOrder); Signature::MapPublicKey userLatestActionCounter; databaseStorage.fetchNodeUserActionGaps(*nodeToSeed.getRequestHash(), [&serializer, &userLatestActionCounter](const DataView userPublicKeyRaw, u64 actionGapStart, u64 actionGapRange) { serializer.add((const u8*)userPublicKeyRaw.data, PUBLIC_KEY_NUM_BYTES); serializer.add(actionGapStart); serializer.add(actionGapRange); Signature::PublicKey userPublicKey((const char*)userPublicKeyRaw.data, userPublicKeyRaw.size); userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], actionGapStart + actionGapRange); }); databaseStorage.fetchNodeUserLatestActionCounter(*nodeToSeed.getRequestHash(), [&userLatestActionCounter](const DataView userPublicKeyRaw, u64 latestActionCounter) { Signature::PublicKey userPublicKey((const char*)userPublicKeyRaw.data, userPublicKeyRaw.size); userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], latestActionCounter + 1); }); for(auto userLatestActionCounterData : userLatestActionCounter) { // Public key serializer.add((const u8*)userLatestActionCounterData.first.getData(), PUBLIC_KEY_NUM_BYTES); // Latest action counter start serializer.add(userLatestActionCounterData.second); // Latest action counter range (infinite range, meaning we want all packets older than start (latest known packet by user)) serializer.add(~(u64)0ULL - userLatestActionCounterData.second); } Log::debug("Sending request for old data"); bootstrapConnection->put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } void Database::stopSeeding(const Hash &nodeHash) { auto seedInfoIt = seedInfoMap.find(nodeHash); if(seedInfoIt != seedInfoMap.end()) { DhtKey dhtKey(nodeHash); bootstrapConnection->cancelListen(seedInfoIt->second.newDataListenHandle); bootstrapConnection->cancelListen(seedInfoIt->second.requestOldDataListenHandle); bootstrapConnection->cancelListen(seedInfoIt->second.responseKeyListenHandle); seedInfoMap.erase(seedInfoIt); } } void Database::loadNode(const Hash &nodeHash, DatabaseLoadOrder loadOrder) { databaseStorage.loadNode(nodeHash, loadOrder); } unique_ptr Database::create() { shared_ptr creatorKeyPair = make_shared(); auto adminGroupId = uuidGen(); assert(adminGroupId.size() == GROUP_ID_LENGTH); // Header sibs::SafeSerializer serializer; serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add((u8*)creatorKeyPair->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); serializer.add(adminGroupId.data, adminGroupId.size()); try { unsigned char *encryptionKeyRaw = new unsigned char[ENCRYPTION_KEY_BYTE_SIZE]; Encryption::generateKey(encryptionKeyRaw); shared_ptr encryptionKey = make_shared(encryptionKeyRaw, ENCRYPTION_KEY_BYTE_SIZE); shared_ptr hashRequestKey = make_shared(serializer.getBuffer().data(), serializer.getBuffer().size()); databaseStorage.setNodeDecryptionKey(*hashRequestKey, DataView(encryptionKey->data, encryptionKey->size)); databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size()); DatabaseNode databaseNode = { encryptionKey, hashRequestKey }; seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); DhtKey dhtKey(*hashRequestKey); bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); shared_ptr adminGroupIdResponse = make_shared(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); return make_unique(creatorKeyPair, adminGroupIdResponse, encryptionKey, hashRequestKey); } catch (EncryptionException &e) { throw DatabaseCreateException("Failed to encrypt data for 'create' request"); } } void Database::addData(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, DataView dataToAdd) { sibs::SafeSerializer serializer; serializer.add(DATABASE_ADD_PACKET_STRUCTURE_VERSION); u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add(DatabaseOperation::ADD_DATA); u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1; serializer.add(newActionCounter); DataView encryptionKey(nodeInfo.getNodeEncryptionKey()->data, ENCRYPTION_KEY_BYTE_SIZE); Encryption encryptedBody(dataToAdd, encryptionKey); OwnedByteArray requestData = combine(serializer, encryptedBody); string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData.getView()); OwnedByteArray stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); DataView encryptedDataView((char*)requestData.data + serializer.getBuffer().size(), requestData.size - serializer.getBuffer().size()); databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) { sibs::SafeSerializer serializer; serializer.add(DATABASE_ADD_PACKET_STRUCTURE_VERSION); u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add(DatabaseOperation::ADD_USER); u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1; serializer.add(newActionCounter); usize additionalDataOffset = serializer.getBuffer().size(); serializer.add((u8*)userToAddPublicKey.getData(), PUBLIC_KEY_NUM_BYTES); serializer.add((uint8_t*)groupToAddUserTo.data, groupToAddUserTo.size); auto padding = uuidGen(); assert(padding.size() == 16); serializer.add(padding.data, padding.size()); DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() }; string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData); OwnedByteArray stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); DataView additionalDataView((void*)(static_cast(requestData.data) + additionalDataOffset), requestData.size - additionalDataOffset); databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() { while(!timestampSynced) { this_thread::sleep_for(10ms); } struct timeval currentLocalTime; gettimeofday(¤tLocalTime, NULL); ntp::NtpTimestamp timestamp; timestamp.seconds = currentLocalTime.tv_sec - timeOffset; timestamp.fractions = currentLocalTime.tv_usec - timeOffsetFraction; return timestamp; } void Database::deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const shared_ptr encryptionKey) { sibs::SafeDeserializer deserializer((const u8*)data, size); u16 packetStructureVersion = deserializer.extract(); if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) { string errMsg = "Received 'create' request with packet structure version "; errMsg += to_string(packetStructureVersion); errMsg += ", but our packet structure version is "; errMsg += to_string(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); throw sibs::DeserializeException(errMsg); } u64 creationDate = deserializer.extract(); /* // TODO: This doesn't seem to work right now, fix it auto currentTimestamp = getSyncedTimestampUtc(); if(creationDate > currentTimestamp.getCombined()) { auto creationDateTimestamp = ntp::NtpTimestamp::fromCombined(creationDate); string errMsg = "Packet is from the future. Packet creation time: "; errMsg += to_string((double)creationDateTimestamp.seconds + creationDateTimestamp.getFractionAsSeconds()); errMsg += ", current time: "; errMsg += to_string((double)currentTimestamp.seconds + currentTimestamp.getFractionAsSeconds()); throw sibs::DeserializeException(errMsg); } */ char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey userPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); uint8_t adminGroupId[GROUP_ID_LENGTH]; deserializer.extract(adminGroupId, GROUP_ID_LENGTH); databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, data, size); } void Database::deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr &nodeHash, const shared_ptr encryptionKey) { sibs::SafeDeserializer deserializer((const u8*)data, size); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); DataView signedData((void*)deserializer.getBuffer(), deserializer.getSize()); string unsignedData = creatorPublicKey.unsign(signedData); sibs::SafeDeserializer deserializerUnsigned((u8*)unsignedData.data(), unsignedData.size()); u16 packetStructureVersion = deserializerUnsigned.extract(); if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) { string errMsg = "Received 'create' request with packet structure version "; errMsg += to_string(packetStructureVersion); errMsg += ", but our packet structure version is "; errMsg += to_string(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); throw sibs::DeserializeException(errMsg); } u64 creationDate = deserializerUnsigned.extract(); auto currentTimestamp = getSyncedTimestampUtc(); /* // TODO: This doesn't seem to work right now, fix it if(creationDate > currentTimestamp.getCombined()) { auto creationDateTimestamp = ntp::NtpTimestamp::fromCombined(creationDate); string errMsg = "Packet is from the future. Packet creation time: "; errMsg += to_string((double)creationDateTimestamp.seconds + creationDateTimestamp.getFractionAsSeconds()); errMsg += ", current time: "; errMsg += to_string((double)currentTimestamp.seconds + currentTimestamp.getFractionAsSeconds()); throw sibs::DeserializeException(errMsg); } */ DatabaseOperation operation = deserializerUnsigned.extract(); u64 newActionCounter = deserializerUnsigned.extract(); DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, data, size, additionalDataView); } bool Database::listenCreateData(const void *data, const usize size, const Hash &hash, const shared_ptr encryptionKey) { Log::debug("Got create data in node %s", hash.toString().c_str()); try { // This check is here to reduce processing, it doesn't matter much if the packet bypasses this, // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesNodeExist(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); deserializeCreateRequest(data, size, hash, encryptionKey); } catch (exception &e) { Log::warn("Failed to deserialize 'create' request: %s", e.what()); } return true; } bool Database::listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr nodeHash, const shared_ptr encryptionKey) { Log::debug("Got add data in node %s", nodeHash->toString().c_str()); try { // This check is here to reduce processing, it doesn't matter much if the packet bypasses this, // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesDataExist(requestDataHash)) throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)"); deserializeAddRequest(data, size, requestDataHash, nodeHash, encryptionKey); } catch (RequestQuarantineException &e) { Log::warn("Request was put in quarantine, will be processed later"); } catch (exception &e) { Log::warn("Failed to deserialize 'add' request: %s", e.what()); } return true; } bool Database::doesStoredUserExist(const string &username) const { return databaseStorage.doesStoredUserExist(username); } void Database::storeUserWithoutNodes(const string &username, const string &password) { return databaseStorage.storeUserWithoutNodes(username, password); } void Database::storeNodeInfoForUserEncrypted(const DatabaseNode &nodeInfo, const string &username, const string &password, const Signature::KeyPair &keyPair) { return databaseStorage.storeNodeInfoForUserEncrypted(nodeInfo, username, password, keyPair); } MapHash Database::getStoredNodeUserInfoDecrypted(const string &username, const string &password) const { return databaseStorage.getStoredNodeUserInfoDecrypted(username, password); } vector Database::getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const { return databaseStorage.getUserGroups(nodeHash, userPublicKey); } int Database::getUserLowestPermissionLevel(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const { return databaseStorage.getUserLowestPermissionLevel(nodeHash, userPublicKey); } sibs::ListenHandle Database::receiveCustomMessage(const InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) { InfoHash responseKey = receiveMessageKey; ++responseKey[0]; return bootstrapConnection->listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { sibs::SafeSerializer serializer = callbackFunc(data, size); if(!serializer.getBuffer().empty()) { bootstrapConnection->put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } return true; }); } void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size) { bootstrapConnection->put(requestKey.getKey(), data, size); } sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc) { InfoHash responseKey = requestKey; ++responseKey[0]; auto listener = bootstrapConnection->listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { return callbackFunc(true, data, size); }); sendCustomMessage(requestKey, data, size); return listener; } void Database::cancelNodeListener(sibs::ListenHandle &nodeListener) { bootstrapConnection->cancelListen(nodeListener); } int Database::clearCache() { return databaseStorage.clearCache(); } InfoHash Database::getInfoHash(const void *data, usize size) { return InfoHash::generateHash((const u8*)data, size); } }