aboutsummaryrefslogtreecommitdiff
path: root/src/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Database.cpp')
-rw-r--r--src/Database.cpp129
1 files changed, 97 insertions, 32 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 9985bb9..fdafab8 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -24,13 +24,13 @@ static time_t timeOffset = 0; // Updated by comparing local time with ntp server
static odhtdb::u64 timeOffsetFraction = 0;
static thread *ntpThread = nullptr;
static bool timestampSynced = false;
-static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data");
-static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data");
const int OPENDHT_INFOHASH_LEN = 20;
namespace odhtdb
{
+ static boost::uuids::random_generator uuidGen;
+
const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
@@ -40,23 +40,23 @@ namespace odhtdb
RequestQuarantineException() : runtime_error("Request quarantine, will be processed later (can be real of fake request)") {}
};
- DataView combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData)
+ OwnedMemory combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData)
{
usize allocationSize = headerSerializer.getBuffer().size() + encryptedData.getNonce().size + encryptedData.getCipherText().size;
char *result = new char[allocationSize];
memcpy(result, headerSerializer.getBuffer().data(), headerSerializer.getBuffer().size());
memcpy(result + headerSerializer.getBuffer().size(), encryptedData.getNonce().data, encryptedData.getNonce().size);
memcpy(result + headerSerializer.getBuffer().size() + encryptedData.getNonce().size, encryptedData.getCipherText().data, encryptedData.getCipherText().size);
- return DataView(result, allocationSize);
+ return OwnedMemory(result, allocationSize);
}
- DataView combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData)
+ OwnedMemory combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData)
{
usize allocationSize = publicKey.getSize() + signedEncryptedData.size();
char *result = new char[allocationSize];
memcpy(result, publicKey.getData(), publicKey.getSize());
memcpy(result + publicKey.getSize(), signedEncryptedData.data(), signedEncryptedData.size());
- return DataView(result, allocationSize);
+ return OwnedMemory(result, allocationSize);
}
DatabaseCreateResponse::DatabaseCreateResponse(std::shared_ptr<Signature::KeyPair> _nodeAdminKeyPair, std::shared_ptr<OwnedMemory> _nodeAdminGroupId, shared_ptr<OwnedMemory> _key, shared_ptr<Hash> _hash) :
@@ -163,6 +163,12 @@ namespace odhtdb
--databaseCount;
node.join();
}
+
+ struct ActionGap
+ {
+ u64 start;
+ u64 range;
+ };
void Database::seed(const DatabaseNode &nodeToSeed)
{
@@ -211,7 +217,7 @@ namespace odhtdb
});
newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture));
- // TODO: Before listening on this key, we should check how many remote peers are also providing this data.
+ // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed](const shared_ptr<Value> &value)
{
@@ -219,12 +225,12 @@ namespace odhtdb
try
{
sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
- u64 dataStartTimestamp = deserializer.extract<u64>();
- u8 requestResponseKey[OPENDHT_INFOHASH_LEN];
- deserializer.extract(requestResponseKey, OPENDHT_INFOHASH_LEN);
- InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN);
+ InfoHash requestResponseInfoHash;
+ deserializer.extract(requestResponseInfoHash.data(), OPENDHT_INFOHASH_LEN);
+ static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?");
+ bool userWantsCreateNode = deserializer.extract<u8>() == 1;
- if(dataStartTimestamp == 0)
+ if(userWantsCreateNode)
{
databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
@@ -238,8 +244,39 @@ namespace odhtdb
});
}
- databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
+ // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
+ DataViewMap<vector<ActionGap>> actionGaps;
+ while(!deserializer.empty())
{
+ u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
+ deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
+ u64 actionGapStart = deserializer.extract<u64>();
+ u64 actionGapRange = deserializer.extract<u64>();
+
+ DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
+ actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
+ }
+
+ // TODO(Performance improvement): Instead of sending several packets, combine them into one
+ databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ {
+ bool sendData = false;
+ auto actionGapsIt = actionGaps.find(creatorPublicKey);
+ if(actionGapsIt == actionGaps.end())
+ sendData = true;
+ else
+ {
+ for(const auto &userActionGaps : actionGapsIt->second)
+ {
+ if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
+ {
+ sendData = true;
+ break;
+ }
+ }
+ }
+
+ if(!sendData) return;
Value value((u8*)rawData.data, rawData.size);
node.put(requestResponseInfoHash, move(value), [](bool ok)
{
@@ -255,13 +292,39 @@ namespace odhtdb
return true;
});
newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture));
-
seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo;
sibs::SafeSerializer serializer;
- serializer.add((u64)0); // Timestamp in microseconds, fetch data newer than this. // TODO: Get timestamp from database storage
serializer.add(responseKey, OPENDHT_INFOHASH_LEN);
- node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok)
+ bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash());
+ serializer.add(iHaveCreateNode ? (u8)0 : (u8)1);
+ DataViewMap<u64> userLatestActionCounter;
+
+ databaseStorage.fetchNodeUserActionGaps(*nodeToSeed.getRequestHash(), [&serializer, &userLatestActionCounter](const DataView userPublicKey, u64 actionGapStart, u64 actionGapRange)
+ {
+ serializer.add((const u8*)userPublicKey.data, PUBLIC_KEY_NUM_BYTES);
+ serializer.add(actionGapStart);
+ serializer.add(actionGapRange);
+ userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], actionGapStart + actionGapRange);
+ });
+
+ databaseStorage.fetchNodeUserLatestActionCounter(*nodeToSeed.getRequestHash(), [&userLatestActionCounter](const DataView userPublicKey, u64 latestActionCounter)
+ {
+ userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], latestActionCounter);
+ });
+
+ for(auto userLatestActionCounterData : userLatestActionCounter)
+ {
+ // Public key
+ serializer.add((const u8*)userLatestActionCounterData.first.data, PUBLIC_KEY_NUM_BYTES);
+ // Latest action counter start
+ serializer.add(userLatestActionCounterData.second);
+ // Latest action counter range (infinite range, meaning we want all packets older than start (latest known packet by user))
+ serializer.add(~(u64)0ULL - userLatestActionCounterData.second);
+ }
+
+ Value requestValue(move(serializer.getBuffer()));
+ node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok)
{
if(!ok)
Log::warn("Failed to put request to get old data");
@@ -291,14 +354,12 @@ namespace odhtdb
{
shared_ptr<Signature::KeyPair> creatorKeyPair = make_shared<Signature::KeyPair>();
- // TODO: Should this be declared static? is there any difference in behavior/performance?
- boost::uuids::random_generator uuidGen;
auto adminGroupId = uuidGen();
assert(adminGroupId.size() == GROUP_ID_LENGTH);
// Header
sibs::SafeSerializer serializer;
- serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); // Packet structure version
+ serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION);
u64 timestampCombined = getSyncedTimestampUtc().getCombined();
serializer.add(timestampCombined);
serializer.add((u8*)creatorKeyPair->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES);
@@ -353,20 +414,20 @@ namespace odhtdb
u64 timestampCombined = getSyncedTimestampUtc().getCombined();
serializer.add(timestampCombined);
serializer.add(DatabaseOperation::ADD_DATA);
+ u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1;
+ serializer.add(newActionCounter);
DataView encryptionKey(nodeInfo.getNodeEncryptionKey()->data, ENCRYPTION_KEY_BYTE_SIZE);
Encryption encryptedBody(dataToAdd, DataView(), encryptionKey);
- DataView requestData = combine(serializer, encryptedBody);
- string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData);
- DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
+ OwnedMemory requestData = combine(serializer, encryptedBody);
+ string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData.getView());
+ OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
Hash requestDataHash(stagedAddObject.data, stagedAddObject.size);
DataView encryptedDataView((char*)requestData.data + serializer.getBuffer().size(), requestData.size - serializer.getBuffer().size());
- databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
- delete[] (char*)requestData.data;
+ databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(requestDataHash);
Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- delete[] (char*)stagedAddObject.data;
node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
{
// TODO: Handle failure to put data
@@ -391,26 +452,25 @@ namespace odhtdb
u64 timestampCombined = getSyncedTimestampUtc().getCombined();
serializer.add(timestampCombined);
serializer.add(DatabaseOperation::ADD_USER);
+ u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1;
+ serializer.add(newActionCounter);
usize additionalDataOffset = serializer.getBuffer().size();
serializer.add((u8*)userToAddPublicKey.getData(), PUBLIC_KEY_NUM_BYTES);
serializer.add((uint8_t*)groupToAddUserTo.data, groupToAddUserTo.size);
- // TODO: Should this be declared static? is there any difference in behavior/performance?
- boost::uuids::random_generator uuidGen;
auto padding = uuidGen();
assert(padding.size() == 16);
serializer.add(padding.data, padding.size());
DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() };
string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData);
- DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
+ OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
Hash requestDataHash(stagedAddObject.data, stagedAddObject.size);
DataView additionalDataView((void*)(static_cast<const char*>(requestData.data) + additionalDataOffset), requestData.size - additionalDataOffset);
- databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
+ databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(requestDataHash);
Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- delete[] (char*)stagedAddObject.data;
node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
{
// TODO: Handle failure to put data
@@ -511,8 +571,10 @@ namespace odhtdb
*/
DatabaseOperation operation = deserializerUnsigned.extract<DatabaseOperation>();
+ u64 newActionCounter = deserializerUnsigned.extract<u64>();
+
DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize());
- databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView);
+ databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView);
}
bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedMemory> encryptionKey)
@@ -520,6 +582,8 @@ namespace odhtdb
Log::debug("Got create data");
try
{
+ // This check is here to reduce processing, it doesn't matter much if the packet bypasses this,
+ // the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesNodeExist(hash))
throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)");
deserializeCreateRequest(value, hash, encryptionKey);
@@ -536,10 +600,11 @@ namespace odhtdb
Log::debug("Got add data");
try
{
+ // This check is here to reduce processing, it doesn't matter much if the packet bypasses this,
+ // the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesDataExist(requestDataHash))
throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)");
deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey);
- //Log::debug("Got add object, timestamp: %zu", addObject.timestamp);
}
catch (RequestQuarantineException &e)
{