aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-05-21 01:02:11 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-05-21 01:02:13 +0200
commit45c0c31c54d4dd9f164db770ebede5087bbc8aef (patch)
tree93fd49fbfbff9b5be49791839d005226d5c87e60 /src
parente46344de86b92da7bb8c6e82c8f668fa5d20357c (diff)
Ping node before sending old data
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp159
-rw-r--r--src/DatabaseStorage.cpp5
2 files changed, 107 insertions, 57 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 140ca85..f68739a 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -34,15 +34,21 @@ namespace odhtdb
const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1;
+ const int NODE_PUT_RETRY_TIMES = 3;
- static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value)
+ static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value, int retryCounter = 0)
{
- node->put(infoHash, value, [node, infoHash, value](bool ok)
+ node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok)
{
if(!ok)
{
- Log::error("Failed to execute node.put, retrying...");
- nodePutWithRetry(node, infoHash, value);
+ if(retryCounter < NODE_PUT_RETRY_TIMES)
+ {
+ Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES);
+ nodePutWithRetry(node, infoHash, value, retryCounter + 1);
+ }
+ else
+ Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES);
}
});
}
@@ -207,6 +213,70 @@ namespace odhtdb
u64 start;
u64 range;
};
+
+ void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr<InfoHash> requestResponseInfoHash, const shared_ptr<Value> value, usize valueOffset)
+ {
+ Log::debug("Request: Got request to send old data");
+ try
+ {
+ sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset);
+ bool userWantsCreateNode = deserializer.extract<u8>() == 1;
+ DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>();
+
+ if(userWantsCreateNode)
+ {
+ Log::debug("Request: Peer wants CreateNode");
+ databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
+ {
+ Log::debug("Request: Sent create packet to requesting peer");
+ shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
+ nodePutWithRetry(&node, *requestResponseInfoHash, value);
+ });
+ }
+
+ // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
+ DataViewMap<vector<ActionGap>> actionGaps;
+ while(!deserializer.empty())
+ {
+ u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
+ deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
+ u64 actionGapStart = deserializer.extract<u64>();
+ u64 actionGapRange = deserializer.extract<u64>();
+
+ DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
+ actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
+ }
+
+ // TODO(Performance improvement): Instead of sending several packets, combine them into one
+ databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ {
+ bool sendData = false;
+ auto actionGapsIt = actionGaps.find(creatorPublicKey);
+ if(actionGapsIt == actionGaps.end())
+ sendData = true;
+ else
+ {
+ for(const auto &userActionGaps : actionGapsIt->second)
+ {
+ if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
+ {
+ sendData = true;
+ break;
+ }
+ }
+ }
+
+ if(!sendData) return;
+ shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
+ nodePutWithRetry(&node, *requestResponseInfoHash, value);
+ this_thread::sleep_for(chrono::milliseconds(50));
+ }, fetchOrder);
+ }
+ catch (std::exception &e)
+ {
+ Log::warn("Failed while serving peer, error: %s", e.what());
+ }
+ }
void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder)
{
@@ -248,6 +318,16 @@ namespace odhtdb
// TODO: If this response key is spammed, generate a new one.
auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value)
{
+ if(value->data.size() == OPENDHT_INFOHASH_LEN)
+ {
+ sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ InfoHash pingResponseKey;
+ deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN);
+ shared_ptr<Value> responseValue = make_shared<Value>();
+ nodePutWithRetry(&node, pingResponseKey, responseValue);
+ return true;
+ }
+
const Hash requestHash(value->data.data(), value->data.size());
if(requestHash == *nodeToSeed.getRequestHash())
return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey());
@@ -260,7 +340,6 @@ namespace odhtdb
// This is to prevent too many peers from responding to a request to get old data.
auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value)
{
- Log::debug("Request: Got request to send old data");
try
{
static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?");
@@ -271,9 +350,9 @@ namespace odhtdb
Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
return true;
}
- InfoHash requestResponseInfoHash;
- deserializer.extract(requestResponseInfoHash.data(), OPENDHT_INFOHASH_LEN);
- if(*responseKeyShared == requestResponseInfoHash)
+ shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>();
+ deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN);
+ if(*responseKeyShared == *requestResponseInfoHash)
{
Log::debug("Request: Ignorning request for old data from ourself");
return true;
@@ -281,57 +360,18 @@ namespace odhtdb
else
Log::debug("Request: Got request from somebody else");
- bool userWantsCreateNode = deserializer.extract<u8>() == 1;
- DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>();
-
- if(userWantsCreateNode)
- {
- Log::debug("Request: Peer wants CreateNode");
- databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
- {
- Log::debug("Request: Sent create packet to requesting peer");
- shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
- nodePutWithRetry(&node, requestResponseInfoHash, value);
- });
- }
-
- // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
- DataViewMap<vector<ActionGap>> actionGaps;
- while(!deserializer.empty())
+ u8 pingResponseKey[OPENDHT_INFOHASH_LEN];
+ randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN);
+ InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN);
+ usize valueOffset = value->data.size() - deserializer.getSize();
+ node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr<Value> _)
{
- u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
- deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
- u64 actionGapStart = deserializer.extract<u64>();
- u64 actionGapRange = deserializer.extract<u64>();
-
- DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
- actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
- }
+ sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset);
+ return false;
+ });
- // TODO(Performance improvement): Instead of sending several packets, combine them into one
- databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
- {
- bool sendData = false;
- auto actionGapsIt = actionGaps.find(creatorPublicKey);
- if(actionGapsIt == actionGaps.end())
- sendData = true;
- else
- {
- for(const auto &userActionGaps : actionGapsIt->second)
- {
- if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
- {
- sendData = true;
- break;
- }
- }
- }
-
- if(!sendData) return;
- shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
- nodePutWithRetry(&node, requestResponseInfoHash, value);
- this_thread::sleep_for(chrono::milliseconds(50));
- }, fetchOrder);
+ shared_ptr<Value> pingRequest = make_shared<Value>(pingResponseKey, OPENDHT_INFOHASH_LEN);
+ nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest);
}
catch (std::exception &e)
{
@@ -682,6 +722,11 @@ namespace odhtdb
nodePutWithRetry(&node, requestKey, value);
}
+ int Database::clearCache()
+ {
+ return databaseStorage.clearCache();
+ }
+
dht::InfoHash Database::getInfoHash(const void *data, usize size)
{
return dht::InfoHash::get((const u8*)data, size);
diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp
index b4c9a9e..6e4046d 100644
--- a/src/DatabaseStorage.cpp
+++ b/src/DatabaseStorage.cpp
@@ -1285,4 +1285,9 @@ namespace odhtdb
auto time = chrono::high_resolution_clock::now().time_since_epoch();
auto timeMicroseconds = chrono::duration_cast<chrono::microseconds>(time).count();
}
+
+ int DatabaseStorage::clearCache()
+ {
+ return sqlite3_db_release_memory(sqliteDb);
+ }
}