From 4e9ca6c06dc954f9deffe7d1680ef14c06990f2c Mon Sep 17 00:00:00 2001 From: dec05eba Date: Mon, 21 May 2018 01:02:11 +0200 Subject: Ping node before sending old data --- src/Database.cpp | 159 +++++++++++++++++++++++++++++++----------------- src/DatabaseStorage.cpp | 5 ++ 2 files changed, 107 insertions(+), 57 deletions(-) (limited to 'src') diff --git a/src/Database.cpp b/src/Database.cpp index 140ca85..f68739a 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -34,15 +34,21 @@ namespace odhtdb const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; + const int NODE_PUT_RETRY_TIMES = 3; - static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr value) + static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr value, int retryCounter = 0) { - node->put(infoHash, value, [node, infoHash, value](bool ok) + node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok) { if(!ok) { - Log::error("Failed to execute node.put, retrying..."); - nodePutWithRetry(node, infoHash, value); + if(retryCounter < NODE_PUT_RETRY_TIMES) + { + Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES); + nodePutWithRetry(node, infoHash, value, retryCounter + 1); + } + else + Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES); } }); } @@ -207,6 +213,70 @@ namespace odhtdb u64 start; u64 range; }; + + void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr requestResponseInfoHash, const shared_ptr value, usize valueOffset) + { + Log::debug("Request: Got request to send old data"); + try + { + sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset); + bool userWantsCreateNode = deserializer.extract() == 1; + DatabaseFetchOrder fetchOrder = deserializer.extract(); + + if(userWantsCreateNode) + { + Log::debug("Request: Peer wants CreateNode"); + databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) + { + Log::debug("Request: Sent create packet to requesting peer"); + shared_ptr value = make_shared((u8*)rawData.data, rawData.size); + nodePutWithRetry(&node, *requestResponseInfoHash, value); + }); + } + + // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants + DataViewMap> actionGaps; + while(!deserializer.empty()) + { + u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; + deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + u64 actionGapStart = deserializer.extract(); + u64 actionGapRange = deserializer.extract(); + + DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); + } + + // TODO(Performance improvement): Instead of sending several packets, combine them into one + databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + { + bool sendData = false; + auto actionGapsIt = actionGaps.find(creatorPublicKey); + if(actionGapsIt == actionGaps.end()) + sendData = true; + else + { + for(const auto &userActionGaps : actionGapsIt->second) + { + if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) + { + sendData = true; + break; + } + } + } + + if(!sendData) return; + shared_ptr value = make_shared((u8*)rawData.data, rawData.size); + nodePutWithRetry(&node, *requestResponseInfoHash, value); + this_thread::sleep_for(chrono::milliseconds(50)); + }, fetchOrder); + } + catch (std::exception &e) + { + Log::warn("Failed while serving peer, error: %s", e.what()); + } + } void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder) { @@ -248,6 +318,16 @@ namespace odhtdb // TODO: If this response key is spammed, generate a new one. auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr value) { + if(value->data.size() == OPENDHT_INFOHASH_LEN) + { + sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + InfoHash pingResponseKey; + deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN); + shared_ptr responseValue = make_shared(); + nodePutWithRetry(&node, pingResponseKey, responseValue); + return true; + } + const Hash requestHash(value->data.data(), value->data.size()); if(requestHash == *nodeToSeed.getRequestHash()) return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey()); @@ -260,7 +340,6 @@ namespace odhtdb // This is to prevent too many peers from responding to a request to get old data. auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr value) { - Log::debug("Request: Got request to send old data"); try { static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?"); @@ -271,9 +350,9 @@ namespace odhtdb Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); return true; } - InfoHash requestResponseInfoHash; - deserializer.extract(requestResponseInfoHash.data(), OPENDHT_INFOHASH_LEN); - if(*responseKeyShared == requestResponseInfoHash) + shared_ptr requestResponseInfoHash = make_shared(); + deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN); + if(*responseKeyShared == *requestResponseInfoHash) { Log::debug("Request: Ignorning request for old data from ourself"); return true; @@ -281,57 +360,18 @@ namespace odhtdb else Log::debug("Request: Got request from somebody else"); - bool userWantsCreateNode = deserializer.extract() == 1; - DatabaseFetchOrder fetchOrder = deserializer.extract(); - - if(userWantsCreateNode) - { - Log::debug("Request: Peer wants CreateNode"); - databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) - { - Log::debug("Request: Sent create packet to requesting peer"); - shared_ptr value = make_shared((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, requestResponseInfoHash, value); - }); - } - - // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants - DataViewMap> actionGaps; - while(!deserializer.empty()) + u8 pingResponseKey[OPENDHT_INFOHASH_LEN]; + randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN); + InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN); + usize valueOffset = value->data.size() - deserializer.getSize(); + node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr _) { - u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; - deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); - u64 actionGapStart = deserializer.extract(); - u64 actionGapRange = deserializer.extract(); - - DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); - actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); - } + sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset); + return false; + }); - // TODO(Performance improvement): Instead of sending several packets, combine them into one - databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) - { - bool sendData = false; - auto actionGapsIt = actionGaps.find(creatorPublicKey); - if(actionGapsIt == actionGaps.end()) - sendData = true; - else - { - for(const auto &userActionGaps : actionGapsIt->second) - { - if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) - { - sendData = true; - break; - } - } - } - - if(!sendData) return; - shared_ptr value = make_shared((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, requestResponseInfoHash, value); - this_thread::sleep_for(chrono::milliseconds(50)); - }, fetchOrder); + shared_ptr pingRequest = make_shared(pingResponseKey, OPENDHT_INFOHASH_LEN); + nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest); } catch (std::exception &e) { @@ -682,6 +722,11 @@ namespace odhtdb nodePutWithRetry(&node, requestKey, value); } + int Database::clearCache() + { + return databaseStorage.clearCache(); + } + dht::InfoHash Database::getInfoHash(const void *data, usize size) { return dht::InfoHash::get((const u8*)data, size); diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index b4c9a9e..6e4046d 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -1285,4 +1285,9 @@ namespace odhtdb auto time = chrono::high_resolution_clock::now().time_since_epoch(); auto timeMicroseconds = chrono::duration_cast(time).count(); } + + int DatabaseStorage::clearCache() + { + return sqlite3_db_release_memory(sqliteDb); + } } -- cgit v1.2.3