aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2018-05-18 04:21:49 +0200
committerdec05eba <dec05eba@protonmail.com>2020-08-18 23:25:46 +0200
commit0b31186fe54cecd238583c060e7bd6ce9a9b1fe9 (patch)
tree23c78609cf90c69a9a143eeb031c0baa8ff36bf9
parent8bc307024e1331811ccdea34d4eb5eb737b7c891 (diff)
Resend data if it fails
-rw-r--r--README.md2
-rw-r--r--src/Database.cpp79
-rw-r--r--src/Log.cpp6
-rw-r--r--tests/main.cpp90
4 files changed, 113 insertions, 64 deletions
diff --git a/README.md b/README.md
index 78e8cc4..2d70bf8 100644
--- a/README.md
+++ b/README.md
@@ -48,3 +48,5 @@ Used by timestamp and action counter
## Packet sorting
When you get packets from remote peers, you might get NodeAddData before Node, in that case the packets should not be discarded but there should be
a flag for NodeAddData to handle such situations
+## Resend packet
+If you get disconnected before data is sent or you send data when everybody else is offline, then resend the data when you reconnect (dhtKey.getResendOldDataKey)
diff --git a/src/Database.cpp b/src/Database.cpp
index 68f7b9c..918cd4e 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -35,6 +35,18 @@ namespace odhtdb
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1;
+ static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value)
+ {
+ node->put(infoHash, value, [node, infoHash, value](bool ok)
+ {
+ if(!ok)
+ {
+ Log::error("Failed to execute node.put, retrying...");
+ nodePutWithRetry(node, infoHash, value);
+ }
+ });
+ }
+
class RequestQuarantineException : public runtime_error
{
public:
@@ -277,12 +289,8 @@ namespace odhtdb
databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
Log::debug("Request: Sent create packet to requesting peer");
- Value value((u8*)rawData.data, rawData.size);
- node.put(requestResponseInfoHash, move(value), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to put response for old data for 'create' data");
- });
+ shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
+ nodePutWithRetry(&node, requestResponseInfoHash, value);
});
}
@@ -319,12 +327,8 @@ namespace odhtdb
}
if(!sendData) return;
- Value value((u8*)rawData.data, rawData.size);
- node.put(requestResponseInfoHash, move(value), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to put response for old data for 'add' data");
- });
+ shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
+ nodePutWithRetry(&node, requestResponseInfoHash, value);
this_thread::sleep_for(chrono::milliseconds(50));
}, fetchOrder);
}
@@ -371,12 +375,8 @@ namespace odhtdb
}
Log::debug("Sending request for old data");
- Value requestValue(move(serializer.getBuffer()));
- node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok)
- {
- if(!ok)
- Log::warn("Failed to put request to get old data");
- });
+ shared_ptr<Value> requestValue = make_shared<Value>(move(serializer.getBuffer()));
+ nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue);
}
void Database::stopSeeding(const Hash &nodeHash)
@@ -424,13 +424,8 @@ namespace odhtdb
databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size());
DhtKey dhtKey(*hashRequestKey);
- Value createDataValue(move(serializer.getBuffer()));
- node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok)
- {
- // TODO: Handle failure to put data
- if(!ok)
- Log::warn("Failed to put: %s, what to do?", "Database::create");
- });
+ shared_ptr<Value> createDataValue = make_shared<Value>(move(serializer.getBuffer()));
+ nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue);
shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH);
memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH);
@@ -462,13 +457,8 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
- {
- // TODO: Handle failure to put data
- if(!ok)
- Log::warn("Failed to put: %s, what to do?", "Database::addData");
- });
+ shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
+ nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
}
void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo)
@@ -496,13 +486,8 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
- {
- // TODO: Handle failure to put data
- if(!ok)
- Log::warn("Failed to put: %s, what to do?", "Database::addUser");
- });
+ shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
+ nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
}
ntp::NtpTimestamp Database::getSyncedTimestampUtc() const
@@ -675,12 +660,8 @@ namespace odhtdb
sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size());
if(!serializer.getBuffer().empty())
{
- Value responseValue(move(serializer.getBuffer()));
- node.put(responseKey, move(responseValue), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to respond to custom message");
- });
+ shared_ptr<Value> responseValue = make_shared<Value>(move(serializer.getBuffer()));
+ nodePutWithRetry(&node, responseKey, responseValue);
}
return true;
});
@@ -696,12 +677,8 @@ namespace odhtdb
return callbackFunc(true, value->data.data(), value->data.size());
});
- Value value(move(data));
- node.put(requestKey, move(value), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to send custom message");
- });
+ shared_ptr<Value> value = make_shared<Value>(move(data));
+ nodePutWithRetry(&node, requestKey, value);
}
dht::InfoHash Database::getInfoHash(const void *data, usize size)
diff --git a/src/Log.cpp b/src/Log.cpp
index 0bdc0a6..f033c29 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -12,7 +12,7 @@ namespace odhtdb
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("Debug: ", stdout);
+ fputs("\033[1;32mDebug:\033[0m ", stdout);
vfprintf(stdout, fmt, args);
fputs("\n", stdout);
va_end(args);
@@ -23,7 +23,7 @@ namespace odhtdb
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("Warning: ", stdout);
+ fputs("\033[1;33mWarning:\033[0m ", stdout);
vfprintf(stdout, fmt, args);
fputs("\n", stdout);
va_end(args);
@@ -34,7 +34,7 @@ namespace odhtdb
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("Error: ", stderr);
+ fputs("\033[1;31Error:\033[0m ", stderr);
vfprintf(stderr, fmt, args);
fputs("\n", stderr);
va_end(args);
diff --git a/tests/main.cpp b/tests/main.cpp
index 5d527fe..b5ce639 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -12,6 +12,7 @@
#include <thread>
#include <opendht.h>
#include <boost/filesystem.hpp>
+#include <map>
using namespace std;
using namespace chrono_literals;
@@ -113,20 +114,20 @@ void testCachedIdentity()
{
pair<shared_ptr<dht::crypto::PrivateKey>, shared_ptr<dht::crypto::Certificate>> identity = dht::crypto::generateIdentity();
dht::Blob privateKeyData = identity.first->serialize();
- printf("Private key size: %d, serialized data: %s\n", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str());
+ Log::debug("Private key size: %d, serialized data: %s", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str());
dht::crypto::PrivateKey privateKeyDeserialized(privateKeyData);
privateKeyData = identity.first->serialize();
- printf("Private key size: %d, serialized data: %s\n", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str());
+ Log::debug("Private key size: %d, serialized data: %s", privateKeyData.size(), Hash(privateKeyData.data(), privateKeyData.size()).toString().c_str());
dht::Blob certificateData;
identity.second->pack(certificateData);
- printf("Certificate data size: %d, serialized data: %s\n", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str());
+ Log::debug("Certificate data size: %d, serialized data: %s", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str());
dht::crypto::Certificate certificateDeserialized(certificateData);
certificateData.clear();
identity.second->pack(certificateData);
- printf("Certificate data size: %d, serialized data: %s\n", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str());
+ Log::debug("Certificate data size: %d, serialized data: %s", certificateData.size(), Hash(certificateData.data(), certificateData.size()).toString().c_str());
}
void testTimestamp(const Database &database)
@@ -141,18 +142,17 @@ void testTimestamp(const Database &database)
fail("Second timestamp is not more than first one for some reason");
}
-int main()
+void testStandard()
{
- Log::debug("Starting tests...");
- boost::filesystem::path storagePath("/tmp/odhtdbTest");
- boost::filesystem::remove_all(storagePath);
- boost::filesystem::create_directory(storagePath);
-
testCachedIdentity();
testBinHexConvert();
testHash();
testEncryption();
+ boost::filesystem::path storagePath("/tmp/odhtdbTest");
+ boost::filesystem::remove_all(storagePath);
+ boost::filesystem::create_directory(storagePath);
+
int createNodeCounter = 0;
int addDataCounter = 0;
int addUserCounter = 0;
@@ -303,6 +303,76 @@ int main()
assertEquals((u32)10, receivedNumber);
assertEquals((u32)20, sendCustomMessageResponseNumber);
}
+}
+
+void testTwoLocalNodes()
+{
+ boost::filesystem::path storagePath1("/tmp/odhtdbTest1");
+ boost::filesystem::remove_all(storagePath1);
+ boost::filesystem::create_directory(storagePath1);
+
+ boost::filesystem::path storagePath2("/tmp/odhtdbTest2");
+ boost::filesystem::remove_all(storagePath2);
+ boost::filesystem::create_directory(storagePath2);
+
+ auto createNodeCallback = [](const DatabaseCreateNodeRequest &request)
+ {
+ Log::debug("Create node callback");
+ };
+
+ auto addNodeCallback = [](const DatabaseAddNodeRequest &request)
+ {
+ Log::debug("Add node callback");
+ };
+
+ auto addUserCallback = [](const DatabaseAddUserRequest &request)
+ {
+ Log::debug("Add user callback");
+ };
+
+ DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback };
+
+ Database database1("bootstrap.ring.cx", 4222, storagePath1, callbackFuncs);
+ auto databaseCreateResponse = database1.create();
+ DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() };
+ auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair();
+ database1.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 });
+ database1.seed(databaseNode);
+
+ Database database2("bootstrap.ring.cx", 4223, storagePath2, callbackFuncs);
+ database2.seed(databaseNode);
+}
+
+int main(int argc, char **argv)
+{
+ map<string, function<void()>> testByName;
+ testByName["standard"] = testStandard;
+ testByName["two_local_nodes"] = testTwoLocalNodes;
+
+ const char *testName = "all";
+ if(argc > 1)
+ testName = argv[1];
+
+ if(strcmp(testName, "all") == 0)
+ {
+ for(auto &testIt : testByName)
+ {
+ Log::debug("Running test: %s", testIt.first.c_str());
+ testIt.second();
+ }
+ }
+ else
+ {
+ auto testIt = testByName.find(testName);
+ if(testIt == testByName.end())
+ {
+ Log::error("There is no test called %s", testName);
+ exit(1);
+ }
+
+ Log::debug("Running test: %s", testIt->first.c_str());
+ testIt->second();
+ }
return 0;
}