From 8daf7b3165c65a932a7d8eae1f0a640199892ca9 Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Mon, 14 May 2018 20:13:24 +0200 Subject: Only download nodes that we are missing --- src/Database.cpp | 129 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 32 deletions(-) (limited to 'src/Database.cpp') diff --git a/src/Database.cpp b/src/Database.cpp index 9985bb9..fdafab8 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -24,13 +24,13 @@ static time_t timeOffset = 0; // Updated by comparing local time with ntp server static odhtdb::u64 timeOffsetFraction = 0; static thread *ntpThread = nullptr; static bool timestampSynced = false; -static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data"); -static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data"); const int OPENDHT_INFOHASH_LEN = 20; namespace odhtdb { + static boost::uuids::random_generator uuidGen; + const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; @@ -40,23 +40,23 @@ namespace odhtdb RequestQuarantineException() : runtime_error("Request quarantine, will be processed later (can be real of fake request)") {} }; - DataView combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) + OwnedMemory combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) { usize allocationSize = headerSerializer.getBuffer().size() + encryptedData.getNonce().size + encryptedData.getCipherText().size; char *result = new char[allocationSize]; memcpy(result, headerSerializer.getBuffer().data(), headerSerializer.getBuffer().size()); memcpy(result + headerSerializer.getBuffer().size(), encryptedData.getNonce().data, encryptedData.getNonce().size); memcpy(result + headerSerializer.getBuffer().size() + encryptedData.getNonce().size, encryptedData.getCipherText().data, encryptedData.getCipherText().size); - return DataView(result, allocationSize); + return OwnedMemory(result, allocationSize); } - DataView combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData) + OwnedMemory combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData) { usize allocationSize = publicKey.getSize() + signedEncryptedData.size(); char *result = new char[allocationSize]; memcpy(result, publicKey.getData(), publicKey.getSize()); memcpy(result + publicKey.getSize(), signedEncryptedData.data(), signedEncryptedData.size()); - return DataView(result, allocationSize); + return OwnedMemory(result, allocationSize); } DatabaseCreateResponse::DatabaseCreateResponse(std::shared_ptr _nodeAdminKeyPair, std::shared_ptr _nodeAdminGroupId, shared_ptr _key, shared_ptr _hash) : @@ -163,6 +163,12 @@ namespace odhtdb --databaseCount; node.join(); } + + struct ActionGap + { + u64 start; + u64 range; + }; void Database::seed(const DatabaseNode &nodeToSeed) { @@ -211,7 +217,7 @@ namespace odhtdb }); newSeedInfo.responseKeyFuture = make_shared>(move(responseKeyFuture)); - // TODO: Before listening on this key, we should check how many remote peers are also providing this data. + // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed](const shared_ptr &value) { @@ -219,12 +225,12 @@ namespace odhtdb try { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - u64 dataStartTimestamp = deserializer.extract(); - u8 requestResponseKey[OPENDHT_INFOHASH_LEN]; - deserializer.extract(requestResponseKey, OPENDHT_INFOHASH_LEN); - InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN); + InfoHash requestResponseInfoHash; + deserializer.extract(requestResponseInfoHash.data(), OPENDHT_INFOHASH_LEN); + static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?"); + bool userWantsCreateNode = deserializer.extract() == 1; - if(dataStartTimestamp == 0) + if(userWantsCreateNode) { databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { @@ -238,8 +244,39 @@ namespace odhtdb }); } - databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) + // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants + DataViewMap> actionGaps; + while(!deserializer.empty()) { + u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; + deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + u64 actionGapStart = deserializer.extract(); + u64 actionGapRange = deserializer.extract(); + + DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); + } + + // TODO(Performance improvement): Instead of sending several packets, combine them into one + databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + { + bool sendData = false; + auto actionGapsIt = actionGaps.find(creatorPublicKey); + if(actionGapsIt == actionGaps.end()) + sendData = true; + else + { + for(const auto &userActionGaps : actionGapsIt->second) + { + if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) + { + sendData = true; + break; + } + } + } + + if(!sendData) return; Value value((u8*)rawData.data, rawData.size); node.put(requestResponseInfoHash, move(value), [](bool ok) { @@ -255,13 +292,39 @@ namespace odhtdb return true; }); newSeedInfo.requestOldDataListenerFuture = make_shared>(move(requestOldDataListenerFuture)); - seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo; sibs::SafeSerializer serializer; - serializer.add((u64)0); // Timestamp in microseconds, fetch data newer than this. // TODO: Get timestamp from database storage serializer.add(responseKey, OPENDHT_INFOHASH_LEN); - node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok) + bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash()); + serializer.add(iHaveCreateNode ? (u8)0 : (u8)1); + DataViewMap userLatestActionCounter; + + databaseStorage.fetchNodeUserActionGaps(*nodeToSeed.getRequestHash(), [&serializer, &userLatestActionCounter](const DataView userPublicKey, u64 actionGapStart, u64 actionGapRange) + { + serializer.add((const u8*)userPublicKey.data, PUBLIC_KEY_NUM_BYTES); + serializer.add(actionGapStart); + serializer.add(actionGapRange); + userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], actionGapStart + actionGapRange); + }); + + databaseStorage.fetchNodeUserLatestActionCounter(*nodeToSeed.getRequestHash(), [&userLatestActionCounter](const DataView userPublicKey, u64 latestActionCounter) + { + userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], latestActionCounter); + }); + + for(auto userLatestActionCounterData : userLatestActionCounter) + { + // Public key + serializer.add((const u8*)userLatestActionCounterData.first.data, PUBLIC_KEY_NUM_BYTES); + // Latest action counter start + serializer.add(userLatestActionCounterData.second); + // Latest action counter range (infinite range, meaning we want all packets older than start (latest known packet by user)) + serializer.add(~(u64)0ULL - userLatestActionCounterData.second); + } + + Value requestValue(move(serializer.getBuffer())); + node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok) { if(!ok) Log::warn("Failed to put request to get old data"); @@ -291,14 +354,12 @@ namespace odhtdb { shared_ptr creatorKeyPair = make_shared(); - // TODO: Should this be declared static? is there any difference in behavior/performance? - boost::uuids::random_generator uuidGen; auto adminGroupId = uuidGen(); assert(adminGroupId.size() == GROUP_ID_LENGTH); // Header sibs::SafeSerializer serializer; - serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); // Packet structure version + serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add((u8*)creatorKeyPair->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); @@ -353,20 +414,20 @@ namespace odhtdb u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add(DatabaseOperation::ADD_DATA); + u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1; + serializer.add(newActionCounter); DataView encryptionKey(nodeInfo.getNodeEncryptionKey()->data, ENCRYPTION_KEY_BYTE_SIZE); Encryption encryptedBody(dataToAdd, DataView(), encryptionKey); - DataView requestData = combine(serializer, encryptedBody); - string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData); - DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); + OwnedMemory requestData = combine(serializer, encryptedBody); + string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData.getView()); + OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); DataView encryptedDataView((char*)requestData.data + serializer.getBuffer().size(), requestData.size - serializer.getBuffer().size()); - databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); - delete[] (char*)requestData.data; + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(requestDataHash); Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - delete[] (char*)stagedAddObject.data; node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) { // TODO: Handle failure to put data @@ -391,26 +452,25 @@ namespace odhtdb u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add(DatabaseOperation::ADD_USER); + u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1; + serializer.add(newActionCounter); usize additionalDataOffset = serializer.getBuffer().size(); serializer.add((u8*)userToAddPublicKey.getData(), PUBLIC_KEY_NUM_BYTES); serializer.add((uint8_t*)groupToAddUserTo.data, groupToAddUserTo.size); - // TODO: Should this be declared static? is there any difference in behavior/performance? - boost::uuids::random_generator uuidGen; auto padding = uuidGen(); assert(padding.size() == 16); serializer.add(padding.data, padding.size()); DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() }; string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData); - DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); + OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); DataView additionalDataView((void*)(static_cast(requestData.data) + additionalDataOffset), requestData.size - additionalDataOffset); - databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(requestDataHash); Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - delete[] (char*)stagedAddObject.data; node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) { // TODO: Handle failure to put data @@ -511,8 +571,10 @@ namespace odhtdb */ DatabaseOperation operation = deserializerUnsigned.extract(); + u64 newActionCounter = deserializerUnsigned.extract(); + DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); - databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); + databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); } bool Database::listenCreateData(shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) @@ -520,6 +582,8 @@ namespace odhtdb Log::debug("Got create data"); try { + // This check is here to reduce processing, it doesn't matter much if the packet bypasses this, + // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesNodeExist(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); deserializeCreateRequest(value, hash, encryptionKey); @@ -536,10 +600,11 @@ namespace odhtdb Log::debug("Got add data"); try { + // This check is here to reduce processing, it doesn't matter much if the packet bypasses this, + // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesDataExist(requestDataHash)) throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)"); deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey); - //Log::debug("Got add object, timestamp: %zu", addObject.timestamp); } catch (RequestQuarantineException &e) { -- cgit v1.2.3