aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp79
-rw-r--r--src/Log.cpp6
2 files changed, 31 insertions, 54 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 68f7b9c..918cd4e 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -35,6 +35,18 @@ namespace odhtdb
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1;
+ static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value)
+ {
+ node->put(infoHash, value, [node, infoHash, value](bool ok)
+ {
+ if(!ok)
+ {
+ Log::error("Failed to execute node.put, retrying...");
+ nodePutWithRetry(node, infoHash, value);
+ }
+ });
+ }
+
class RequestQuarantineException : public runtime_error
{
public:
@@ -277,12 +289,8 @@ namespace odhtdb
databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
Log::debug("Request: Sent create packet to requesting peer");
- Value value((u8*)rawData.data, rawData.size);
- node.put(requestResponseInfoHash, move(value), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to put response for old data for 'create' data");
- });
+ shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
+ nodePutWithRetry(&node, requestResponseInfoHash, value);
});
}
@@ -319,12 +327,8 @@ namespace odhtdb
}
if(!sendData) return;
- Value value((u8*)rawData.data, rawData.size);
- node.put(requestResponseInfoHash, move(value), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to put response for old data for 'add' data");
- });
+ shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
+ nodePutWithRetry(&node, requestResponseInfoHash, value);
this_thread::sleep_for(chrono::milliseconds(50));
}, fetchOrder);
}
@@ -371,12 +375,8 @@ namespace odhtdb
}
Log::debug("Sending request for old data");
- Value requestValue(move(serializer.getBuffer()));
- node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok)
- {
- if(!ok)
- Log::warn("Failed to put request to get old data");
- });
+ shared_ptr<Value> requestValue = make_shared<Value>(move(serializer.getBuffer()));
+ nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue);
}
void Database::stopSeeding(const Hash &nodeHash)
@@ -424,13 +424,8 @@ namespace odhtdb
databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size());
DhtKey dhtKey(*hashRequestKey);
- Value createDataValue(move(serializer.getBuffer()));
- node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok)
- {
- // TODO: Handle failure to put data
- if(!ok)
- Log::warn("Failed to put: %s, what to do?", "Database::create");
- });
+ shared_ptr<Value> createDataValue = make_shared<Value>(move(serializer.getBuffer()));
+ nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue);
shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH);
memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH);
@@ -462,13 +457,8 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
- {
- // TODO: Handle failure to put data
- if(!ok)
- Log::warn("Failed to put: %s, what to do?", "Database::addData");
- });
+ shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
+ nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
}
void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo)
@@ -496,13 +486,8 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
- {
- // TODO: Handle failure to put data
- if(!ok)
- Log::warn("Failed to put: %s, what to do?", "Database::addUser");
- });
+ shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
+ nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
}
ntp::NtpTimestamp Database::getSyncedTimestampUtc() const
@@ -675,12 +660,8 @@ namespace odhtdb
sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size());
if(!serializer.getBuffer().empty())
{
- Value responseValue(move(serializer.getBuffer()));
- node.put(responseKey, move(responseValue), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to respond to custom message");
- });
+ shared_ptr<Value> responseValue = make_shared<Value>(move(serializer.getBuffer()));
+ nodePutWithRetry(&node, responseKey, responseValue);
}
return true;
});
@@ -696,12 +677,8 @@ namespace odhtdb
return callbackFunc(true, value->data.data(), value->data.size());
});
- Value value(move(data));
- node.put(requestKey, move(value), [](bool ok)
- {
- if(!ok)
- Log::error("Failed to send custom message");
- });
+ shared_ptr<Value> value = make_shared<Value>(move(data));
+ nodePutWithRetry(&node, requestKey, value);
}
dht::InfoHash Database::getInfoHash(const void *data, usize size)
diff --git a/src/Log.cpp b/src/Log.cpp
index 0bdc0a6..f033c29 100644
--- a/src/Log.cpp
+++ b/src/Log.cpp
@@ -12,7 +12,7 @@ namespace odhtdb
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("Debug: ", stdout);
+ fputs("\033[1;32mDebug:\033[0m ", stdout);
vfprintf(stdout, fmt, args);
fputs("\n", stdout);
va_end(args);
@@ -23,7 +23,7 @@ namespace odhtdb
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("Warning: ", stdout);
+ fputs("\033[1;33mWarning:\033[0m ", stdout);
vfprintf(stdout, fmt, args);
fputs("\n", stdout);
va_end(args);
@@ -34,7 +34,7 @@ namespace odhtdb
std::lock_guard<std::mutex> lock(mutexLog);
va_list args;
va_start(args, fmt);
- fputs("Error: ", stderr);
+ fputs("\033[1;31Error:\033[0m ", stderr);
vfprintf(stderr, fmt, args);
fputs("\n", stderr);
va_end(args);