From 0b31186fe54cecd238583c060e7bd6ce9a9b1fe9 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 18 May 2018 04:21:49 +0200 Subject: Resend data if it fails --- src/Database.cpp | 79 ++++++++++++++++++++------------------------------------ src/Log.cpp | 6 ++--- 2 files changed, 31 insertions(+), 54 deletions(-) (limited to 'src') diff --git a/src/Database.cpp b/src/Database.cpp index 68f7b9c..918cd4e 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -35,6 +35,18 @@ namespace odhtdb const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; + static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr value) + { + node->put(infoHash, value, [node, infoHash, value](bool ok) + { + if(!ok) + { + Log::error("Failed to execute node.put, retrying..."); + nodePutWithRetry(node, infoHash, value); + } + }); + } + class RequestQuarantineException : public runtime_error { public: @@ -277,12 +289,8 @@ namespace odhtdb databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { Log::debug("Request: Sent create packet to requesting peer"); - Value value((u8*)rawData.data, rawData.size); - node.put(requestResponseInfoHash, move(value), [](bool ok) - { - if(!ok) - Log::error("Failed to put response for old data for 'create' data"); - }); + shared_ptr value = make_shared((u8*)rawData.data, rawData.size); + nodePutWithRetry(&node, requestResponseInfoHash, value); }); } @@ -319,12 +327,8 @@ namespace odhtdb } if(!sendData) return; - Value value((u8*)rawData.data, rawData.size); - node.put(requestResponseInfoHash, move(value), [](bool ok) - { - if(!ok) - Log::error("Failed to put response for old data for 'add' data"); - }); + shared_ptr value = make_shared((u8*)rawData.data, rawData.size); + nodePutWithRetry(&node, requestResponseInfoHash, value); this_thread::sleep_for(chrono::milliseconds(50)); }, fetchOrder); } @@ -371,12 +375,8 @@ namespace odhtdb } Log::debug("Sending request for old data"); - Value requestValue(move(serializer.getBuffer())); - node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok) - { - if(!ok) - Log::warn("Failed to put request to get old data"); - }); + shared_ptr requestValue = make_shared(move(serializer.getBuffer())); + nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue); } void Database::stopSeeding(const Hash &nodeHash) @@ -424,13 +424,8 @@ namespace odhtdb databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size()); DhtKey dhtKey(*hashRequestKey); - Value createDataValue(move(serializer.getBuffer())); - node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - Log::warn("Failed to put: %s, what to do?", "Database::create"); - }); + shared_ptr createDataValue = make_shared(move(serializer.getBuffer())); + nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue); shared_ptr adminGroupIdResponse = make_shared(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -462,13 +457,8 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - Log::warn("Failed to put: %s, what to do?", "Database::addData"); - }); + shared_ptr addDataValue = make_shared((u8*)stagedAddObject.data, stagedAddObject.size); + nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -496,13 +486,8 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - Log::warn("Failed to put: %s, what to do?", "Database::addUser"); - }); + shared_ptr addDataValue = make_shared((u8*)stagedAddObject.data, stagedAddObject.size); + nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() const @@ -675,12 +660,8 @@ namespace odhtdb sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size()); if(!serializer.getBuffer().empty()) { - Value responseValue(move(serializer.getBuffer())); - node.put(responseKey, move(responseValue), [](bool ok) - { - if(!ok) - Log::error("Failed to respond to custom message"); - }); + shared_ptr responseValue = make_shared(move(serializer.getBuffer())); + nodePutWithRetry(&node, responseKey, responseValue); } return true; }); @@ -696,12 +677,8 @@ namespace odhtdb return callbackFunc(true, value->data.data(), value->data.size()); }); - Value value(move(data)); - node.put(requestKey, move(value), [](bool ok) - { - if(!ok) - Log::error("Failed to send custom message"); - }); + shared_ptr value = make_shared(move(data)); + nodePutWithRetry(&node, requestKey, value); } dht::InfoHash Database::getInfoHash(const void *data, usize size) diff --git a/src/Log.cpp b/src/Log.cpp index 0bdc0a6..f033c29 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -12,7 +12,7 @@ namespace odhtdb std::lock_guard lock(mutexLog); va_list args; va_start(args, fmt); - fputs("Debug: ", stdout); + fputs("\033[1;32mDebug:\033[0m ", stdout); vfprintf(stdout, fmt, args); fputs("\n", stdout); va_end(args); @@ -23,7 +23,7 @@ namespace odhtdb std::lock_guard lock(mutexLog); va_list args; va_start(args, fmt); - fputs("Warning: ", stdout); + fputs("\033[1;33mWarning:\033[0m ", stdout); vfprintf(stdout, fmt, args); fputs("\n", stdout); va_end(args); @@ -34,7 +34,7 @@ namespace odhtdb std::lock_guard lock(mutexLog); va_list args; va_start(args, fmt); - fputs("Error: ", stderr); + fputs("\033[1;31Error:\033[0m ", stderr); vfprintf(stderr, fmt, args); fputs("\n", stderr); va_end(args); -- cgit v1.2.3