From 5a48f2a4a5cf10c69e75e304a23ed2ca97582a1f Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Fri, 18 May 2018 04:21:49 +0200 Subject: Resend data if it fails --- README.md | 2 ++ src/Database.cpp | 79 ++++++++++++++++++------------------------------- src/Log.cpp | 6 ++-- tests/main.cpp | 90 +++++++++++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 113 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 78e8cc4..2d70bf8 100644 --- a/README.md +++ b/README.md @@ -48,3 +48,5 @@ Used by timestamp and action counter ## Packet sorting When you get packets from remote peers, you might get NodeAddData before Node, in that case the packets should not be discarded but there should be a flag for NodeAddData to handle such situations +## Resend packet +If you get disconnected before data is sent or you send data when everybody else is offline, then resend the data when you reconnect (dhtKey.getResendOldDataKey) diff --git a/src/Database.cpp b/src/Database.cpp index 68f7b9c..918cd4e 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -35,6 +35,18 @@ namespace odhtdb const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; + static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr value) + { + node->put(infoHash, value, [node, infoHash, value](bool ok) + { + if(!ok) + { + Log::error("Failed to execute node.put, retrying..."); + nodePutWithRetry(node, infoHash, value); + } + }); + } + class RequestQuarantineException : public runtime_error { public: @@ -277,12 +289,8 @@ namespace odhtdb databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { Log::debug("Request: Sent create packet to requesting peer"); - Value value((u8*)rawData.data, rawData.size); - node.put(requestResponseInfoHash, move(value), [](bool ok) - { - if(!ok) - Log::error("Failed to put response for old data for 'create' data"); - }); + shared_ptr value = make_shared((u8*)rawData.data, rawData.size); + nodePutWithRetry(&node, requestResponseInfoHash, value); }); } @@ -319,12 +327,8 @@ namespace odhtdb } if(!sendData) return; - Value value((u8*)rawData.data, rawData.size); - node.put(requestResponseInfoHash, move(value), [](bool ok) - { - if(!ok) - Log::error("Failed to put response for old data for 'add' data"); - }); + shared_ptr value = make_shared((u8*)rawData.data, rawData.size); + nodePutWithRetry(&node, requestResponseInfoHash, value); this_thread::sleep_for(chrono::milliseconds(50)); }, fetchOrder); } @@ -371,12 +375,8 @@ namespace odhtdb } Log::debug("Sending request for old data"); - Value requestValue(move(serializer.getBuffer())); - node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok) - { - if(!ok) - Log::warn("Failed to put request to get old data"); - }); + shared_ptr requestValue = make_shared(move(serializer.getBuffer())); + nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue); } void Database::stopSeeding(const Hash &nodeHash) @@ -424,13 +424,8 @@ namespace odhtdb databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size()); DhtKey dhtKey(*hashRequestKey); - Value createDataValue(move(serializer.getBuffer())); - node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - Log::warn("Failed to put: %s, what to do?", "Database::create"); - }); + shared_ptr createDataValue = make_shared(move(serializer.getBuffer())); + nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue); shared_ptr adminGroupIdResponse = make_shared(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -462,13 +457,8 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - Log::warn("Failed to put: %s, what to do?", "Database::addData"); - }); + shared_ptr addDataValue = make_shared((u8*)stagedAddObject.data, stagedAddObject.size); + nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -496,13 +486,8 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - Log::warn("Failed to put: %s, what to do?", "Database::addUser"); - }); + shared_ptr addDataValue = make_shared((u8*)stagedAddObject.data, stagedAddObject.size); + nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() const @@ -675,12 +660,8 @@ namespace odhtdb sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size()); if(!serializer.getBuffer().empty()) { - Value responseValue(move(serializer.getBuffer())); - node.put(responseKey, move(responseValue), [](bool ok) - { - if(!ok) - Log::error("Failed to respond to custom message"); - }); + shared_ptr responseValue = make_shared(move(serializer.getBuffer())); + nodePutWithRetry(&node, responseKey, responseValue); } return true; }); @@ -696,12 +677,8 @@ namespace odhtdb return callbackFunc(true, value->data.data(), value->data.size()); }); - Value value(move(data)); - node.put(requestKey, move(value), [](bool ok) - { - if(!ok) - Log::error("Failed to send custom message"); - }); + shared_ptr value = make_shared(move(data)); + nodePutWithRetry(&node, requestKey, value); } dht::InfoHash Database::getInfoHash(const void *data, usize size) diff --git a/src/Log.cpp b/src/Log.cpp index 0bdc0a6..f033c29 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -12,7 +12,7 @@ namespace odhtdb std::lock_guard lock(mutexLog); va_list args; va_start(args, fmt); - fputs("Debug: ", stdout); + fputs("\033[1;32mDebug:\033[0m ", stdout); vfprintf(stdout, fmt, args); fputs("\n", stdout); va_end(args); @@ -23,7 +23,7 @@ namespace odhtdb std::lock_guard lock(mutexLog); va_list args; va_start(args, fmt); - fputs("Warning: ", stdout); + fputs("\033[1;33mWarning:\033[0m ", stdout); vfprintf(stdout, fmt, args); fputs("\n", stdout); va_end(args); @@ -34,7 +34,7 @@ namespace odhtdb std::lock_guard lock(mutexLog); va_list args; va_start(args, fmt); - fputs("Error: ", stderr); + fputs("\033[1;31Error:\033[0m ", stderr); vfprintf(stderr, fmt, args); fputs("\n", stderr); va_end(args); diff --git a/tests/main.cpp b/tests/main.cpp index 5d527fe..b5ce639 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -12,6 +12,7 @@ #include #include #include +#include using namespace std; using namespace chrono_literals; @@ -113,20 +114,20 @@ void testCachedIdentity() { pair, shared_ptr> identity = dht::crypto::generateIdentity(); dht::Blob privateKeyData = identity.first->serialize(); - printf("Private key size: %d, serialized data: %s\n", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str()); + Log::debug("Private key size: %d, serialized data: %s", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str()); dht::crypto::PrivateKey privateKeyDeserialized(privateKeyData); privateKeyData = identity.first->serialize(); - printf("Private key size: %d, serialized data: %s\n", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str()); + Log::debug("Private key size: %d, serialized data: %s", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str()); dht::Blob certificateData; identity.second->pack(certificateData); - printf("Certificate data size: %d, serialized data: %s\n", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str()); + Log::debug("Certificate data size: %d, serialized data: %s", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str()); dht::crypto::Certificate certificateDeserialized(certificateData); certificateData.clear(); identity.second->pack(certificateData); - printf("Certificate data size: %d, serialized data: %s\n", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str()); + Log::debug("Certificate data size: %d, serialized data: %s", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str()); } void testTimestamp(const Database &database) @@ -141,18 +142,17 @@ void testTimestamp(const Database &database) fail("Second timestamp is not more than first one for some reason"); } -int main() +void testStandard() { - Log::debug("Starting tests..."); - boost::filesystem::path storagePath("/tmp/odhtdbTest"); - boost::filesystem::remove_all(storagePath); - boost::filesystem::create_directory(storagePath); - testCachedIdentity(); testBinHexConvert(); testHash(); testEncryption(); + boost::filesystem::path storagePath("/tmp/odhtdbTest"); + boost::filesystem::remove_all(storagePath); + boost::filesystem::create_directory(storagePath); + int createNodeCounter = 0; int addDataCounter = 0; int addUserCounter = 0; @@ -303,6 +303,76 @@ int main() assertEquals((u32)10, receivedNumber); assertEquals((u32)20, sendCustomMessageResponseNumber); } +} + +void testTwoLocalNodes() +{ + boost::filesystem::path storagePath1("/tmp/odhtdbTest1"); + boost::filesystem::remove_all(storagePath1); + boost::filesystem::create_directory(storagePath1); + + boost::filesystem::path storagePath2("/tmp/odhtdbTest2"); + boost::filesystem::remove_all(storagePath2); + boost::filesystem::create_directory(storagePath2); + + auto createNodeCallback = [](const DatabaseCreateNodeRequest &request) + { + Log::debug("Create node callback"); + }; + + auto addNodeCallback = [](const DatabaseAddNodeRequest &request) + { + Log::debug("Add node callback"); + }; + + auto addUserCallback = [](const DatabaseAddUserRequest &request) + { + Log::debug("Add user callback"); + }; + + DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; + + Database database1("bootstrap.ring.cx", 4222, storagePath1, callbackFuncs); + auto databaseCreateResponse = database1.create(); + DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() }; + auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair(); + database1.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); + database1.seed(databaseNode); + + Database database2("bootstrap.ring.cx", 4223, storagePath2, callbackFuncs); + database2.seed(databaseNode); +} + +int main(int argc, char **argv) +{ + map> testByName; + testByName["standard"] = testStandard; + testByName["two_local_nodes"] = testTwoLocalNodes; + + const char *testName = "all"; + if(argc > 1) + testName = argv[1]; + + if(strcmp(testName, "all") == 0) + { + for(auto &testIt : testByName) + { + Log::debug("Running test: %s", testIt.first.c_str()); + testIt.second(); + } + } + else + { + auto testIt = testByName.find(testName); + if(testIt == testByName.end()) + { + Log::error("There is no test called %s", testName); + exit(1); + } + + Log::debug("Running test: %s", testIt->first.c_str()); + testIt->second(); + } return 0; } -- cgit v1.2.3