aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-05-14 20:13:24 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-05-14 20:13:27 +0200
commit8daf7b3165c65a932a7d8eae1f0a640199892ca9 (patch)
treec6d44529e4e2e0808841366f112700a0bb3d1bfd
parent1fe14e8fcfbec25cb6b70df194a82ede4cc0a3e5 (diff)
Only download nodes that we are missing
-rw-r--r--README.md8
-rw-r--r--Scheme.md4
-rw-r--r--include/odhtdb/DatabaseStorage.hpp14
-rw-r--r--include/odhtdb/sql/Sql.hpp37
-rw-r--r--include/odhtdb/sql/SqlExec.hpp33
-rw-r--r--include/odhtdb/sql/SqlQuery.hpp30
-rw-r--r--src/Database.cpp129
-rw-r--r--src/DatabaseStorage.cpp161
-rw-r--r--src/sql/Sql.cpp21
-rw-r--r--src/sql/SqlExec.cpp72
-rw-r--r--src/sql/SqlQuery.cpp14
11 files changed, 431 insertions, 92 deletions
diff --git a/README.md b/README.md
index 90c17fd..78e8cc4 100644
--- a/README.md
+++ b/README.md
@@ -40,3 +40,11 @@ Combine opendht with udt. Use opendht to find other peers and udt for communicat
Use a merkle tree and when requesting new nodes when connecting, send X latest hashes and other peers can send you where you have missing data and send you it.
If all X hashes are wrong, send older hashes.
However if we are only using opendht to find other peers, we might as well use https://github.com/DavidKeller/kademlia or https://github.com/ytakano/libcage
+## Safe multi-threading SQL handling
+Create SQL classes with mutex for performing prepared statements. Currently the database can crasah if DatabaseStorage methods are accessed at the same time
+from multiple threads
+## Find a way to deal with u64 in sqlite
+Used by timestamp and action counter
+## Packet sorting
+When you get packets from remote peers, you might get NodeAddData before Node, in that case the packets should not be discarded but there should be
+a flag for NodeAddData to handle such situations
diff --git a/Scheme.md b/Scheme.md
index 9a757d2..50aaa3e 100644
--- a/Scheme.md
+++ b/Scheme.md
@@ -12,6 +12,7 @@ Packet
packet structure version
timestamp (not strictly accurate, mostly used for visual representation)
operation type
+ user action counter
Body (Encrypted with node encryption key)
data
# Add user to group
@@ -22,7 +23,8 @@ Packet
packet structure version
timestamp (not strictly accurate, mostly used for visual representation)
operation type
+ user action counter
Body
public key (of user to add to group)
group id (the group which the user should be added to)
- PADDING (Random data, to ensure this packet gets an unique hash)
+ PADDING (Random data, to ensure this packet gets an unique hash)
diff --git a/include/odhtdb/DatabaseStorage.hpp b/include/odhtdb/DatabaseStorage.hpp
index 0d94c91..73b22b9 100644
--- a/include/odhtdb/DatabaseStorage.hpp
+++ b/include/odhtdb/DatabaseStorage.hpp
@@ -63,8 +63,10 @@ namespace odhtdb
const int PASSWORD_SALT_LEN = 16;
const int HASHED_PASSWORD_LEN = 32;
- using FetchNodeRawCallbackFunc = std::function<void(const DataView)>;
- using FetchNodeAddDataRawCallbackFunc = std::function<void(const DataView)>;
+ using FetchNodeRawCallbackFunc = std::function<void(const DataView rawData)>;
+ using FetchNodeAddDataRawCallbackFunc = std::function<void(const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)>;
+ using FetchNodeUserActionGapsCallbackFunc = std::function<void(const DataView userPublicKey, u64 start, u64 range)>;
+ using FetchNodeUserLatestActionCounterCallbackFunc = std::function<void(const DataView userPublicKey, u64 latestActionCounter)>;
class DatabaseStorage
{
@@ -83,7 +85,7 @@ namespace odhtdb
// Throws DatabaseStorageNotFound if data with @nodeHash hash has not been created yet.
// Throws DatabaseStorageAlreadyExists if same data has been added before (hash of @data, in @dataHash)
- void appendStorage(const Hash &nodeHash, const Hash &dataHash, DatabaseOperation operation, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const void *data, usize size, const DataView &additionalDataView);
+ void appendStorage(const Hash &nodeHash, const Hash &dataHash, DatabaseOperation operation, u64 newUserActionCounter, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const void *data, usize size, const DataView &additionalDataView);
// Throws DatabaseStorageAlreadyExists if group already exists in node
void addGroup(const Hash &nodeHash, const DataView &groupId, const Permission &permissions);
@@ -96,9 +98,15 @@ namespace odhtdb
void fetchNodeRaw(const Hash &nodeHash, FetchNodeRawCallbackFunc callbackFunc);
void fetchNodeAddDataRaw(const Hash &nodeHash, FetchNodeAddDataRawCallbackFunc callbackFunc);
+ void fetchNodeUserActionGaps(const Hash &nodeHash, FetchNodeUserActionGapsCallbackFunc callbackFunc);
+ void fetchNodeUserLatestActionCounter(const Hash &nodeHash, FetchNodeUserLatestActionCounterCallbackFunc callbackFunc);
+
bool isUserAllowedToAddDataInNode(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const;
bool isUserAllowedToAddUserToGroupInNode(const Hash &nodeHash, const Signature::PublicKey &userPublicKey, const DataView &groupToAddUserTo) const;
+ // Throws DatabaseStorageNotFound if user doesn't exist in node
+ u64 getUserActionCounter(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const;
+
// Username and key pair has to be unique, returns true on success
//bool storeLocalUser(const std::string &username, const Signature::KeyPair &keyPair, const std::string &password);
diff --git a/include/odhtdb/sql/Sql.hpp b/include/odhtdb/sql/Sql.hpp
new file mode 100644
index 0000000..6c78360
--- /dev/null
+++ b/include/odhtdb/sql/Sql.hpp
@@ -0,0 +1,37 @@
+#pragma once
+
+#include "../DataView.hpp"
+
+class sqlite3;
+class sqlite3_stmt;
+
+namespace odhtdb
+{
+ class SqlArg
+ {
+ public:
+ enum class Type : u8
+ {
+ DATA_VIEW,
+ INT,
+ INT64,
+ UINT64
+ };
+
+ SqlArg(const DataView &data) : dataView(data), type(Type::DATA_VIEW) {}
+ SqlArg(int data) : integer(data), type(Type::INT) {}
+ SqlArg(i64 data) : integer64(data), type(Type::INT64) {}
+ SqlArg(u64 data) : uinteger64(data), type(Type::UINT64) {}
+
+ int bind(sqlite3_stmt *stmt, int paramIndex) const;
+ private:
+ union
+ {
+ const DataView dataView;
+ const int integer;
+ const i64 integer64;
+ const u64 uinteger64;
+ };
+ const Type type;
+ };
+}
diff --git a/include/odhtdb/sql/SqlExec.hpp b/include/odhtdb/sql/SqlExec.hpp
new file mode 100644
index 0000000..07146ea
--- /dev/null
+++ b/include/odhtdb/sql/SqlExec.hpp
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "Sql.hpp"
+#include <utility>
+#include <stdexcept>
+#include <mutex>
+
+namespace odhtdb
+{
+ class SqlExecException : public std::runtime_error
+ {
+ public:
+ SqlExecException(const std::string &errMsg) : std::runtime_error(errMsg) {}
+ };
+
+ class SqlExec
+ {
+ public:
+ // Throws SqlExecException on failure
+ SqlExec(sqlite3 *db, const char *sql);
+ ~SqlExec();
+
+ // Throws SqlExecException on failure
+ void execWithArgs(std::initializer_list<SqlArg> args);
+
+ // Throws SqlExecException on failure
+ void exec();
+ private:
+ sqlite3 *db;
+ sqlite3_stmt *stmt;
+ std::mutex mutex;
+ };
+}
diff --git a/include/odhtdb/sql/SqlQuery.hpp b/include/odhtdb/sql/SqlQuery.hpp
index f26ee1a..615fa3f 100644
--- a/include/odhtdb/sql/SqlQuery.hpp
+++ b/include/odhtdb/sql/SqlQuery.hpp
@@ -1,12 +1,9 @@
#pragma once
-#include "../DataView.hpp"
+#include "Sql.hpp"
#include <utility>
#include <stdexcept>
-class sqlite3;
-class sqlite3_stmt;
-
namespace odhtdb
{
class SqlQueryException : public std::runtime_error
@@ -15,31 +12,6 @@ namespace odhtdb
SqlQueryException(const std::string &errMsg) : std::runtime_error(errMsg) {}
};
- class SqlArg
- {
- public:
- enum class Type : u8
- {
- DATA_VIEW,
- INT,
- INT64
- };
-
- SqlArg(const DataView &data) : dataView(data), type(Type::DATA_VIEW) {}
- SqlArg(int data) : integer(data), type(Type::INT) {}
- SqlArg(i64 data) : integer64(data), type(Type::INT64) {}
-
- int bind(sqlite3_stmt *stmt, int paramIndex) const;
- private:
- union
- {
- const DataView dataView;
- const int integer;
- const i64 integer64;
- };
- const Type type;
- };
-
class SqlQuery
{
public:
diff --git a/src/Database.cpp b/src/Database.cpp
index 9985bb9..fdafab8 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -24,13 +24,13 @@ static time_t timeOffset = 0; // Updated by comparing local time with ntp server
static odhtdb::u64 timeOffsetFraction = 0;
static thread *ntpThread = nullptr;
static bool timestampSynced = false;
-static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data");
-static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data");
const int OPENDHT_INFOHASH_LEN = 20;
namespace odhtdb
{
+ static boost::uuids::random_generator uuidGen;
+
const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
@@ -40,23 +40,23 @@ namespace odhtdb
RequestQuarantineException() : runtime_error("Request quarantine, will be processed later (can be real of fake request)") {}
};
- DataView combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData)
+ OwnedMemory combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData)
{
usize allocationSize = headerSerializer.getBuffer().size() + encryptedData.getNonce().size + encryptedData.getCipherText().size;
char *result = new char[allocationSize];
memcpy(result, headerSerializer.getBuffer().data(), headerSerializer.getBuffer().size());
memcpy(result + headerSerializer.getBuffer().size(), encryptedData.getNonce().data, encryptedData.getNonce().size);
memcpy(result + headerSerializer.getBuffer().size() + encryptedData.getNonce().size, encryptedData.getCipherText().data, encryptedData.getCipherText().size);
- return DataView(result, allocationSize);
+ return OwnedMemory(result, allocationSize);
}
- DataView combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData)
+ OwnedMemory combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData)
{
usize allocationSize = publicKey.getSize() + signedEncryptedData.size();
char *result = new char[allocationSize];
memcpy(result, publicKey.getData(), publicKey.getSize());
memcpy(result + publicKey.getSize(), signedEncryptedData.data(), signedEncryptedData.size());
- return DataView(result, allocationSize);
+ return OwnedMemory(result, allocationSize);
}
DatabaseCreateResponse::DatabaseCreateResponse(std::shared_ptr<Signature::KeyPair> _nodeAdminKeyPair, std::shared_ptr<OwnedMemory> _nodeAdminGroupId, shared_ptr<OwnedMemory> _key, shared_ptr<Hash> _hash) :
@@ -163,6 +163,12 @@ namespace odhtdb
--databaseCount;
node.join();
}
+
+ struct ActionGap
+ {
+ u64 start;
+ u64 range;
+ };
void Database::seed(const DatabaseNode &nodeToSeed)
{
@@ -211,7 +217,7 @@ namespace odhtdb
});
newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture));
- // TODO: Before listening on this key, we should check how many remote peers are also providing this data.
+ // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed](const shared_ptr<Value> &value)
{
@@ -219,12 +225,12 @@ namespace odhtdb
try
{
sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
- u64 dataStartTimestamp = deserializer.extract<u64>();
- u8 requestResponseKey[OPENDHT_INFOHASH_LEN];
- deserializer.extract(requestResponseKey, OPENDHT_INFOHASH_LEN);
- InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN);
+ InfoHash requestResponseInfoHash;
+ deserializer.extract(requestResponseInfoHash.data(), OPENDHT_INFOHASH_LEN);
+ static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?");
+ bool userWantsCreateNode = deserializer.extract<u8>() == 1;
- if(dataStartTimestamp == 0)
+ if(userWantsCreateNode)
{
databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
@@ -238,8 +244,39 @@ namespace odhtdb
});
}
- databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
+ // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
+ DataViewMap<vector<ActionGap>> actionGaps;
+ while(!deserializer.empty())
{
+ u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
+ deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
+ u64 actionGapStart = deserializer.extract<u64>();
+ u64 actionGapRange = deserializer.extract<u64>();
+
+ DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
+ actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
+ }
+
+ // TODO(Performance improvement): Instead of sending several packets, combine them into one
+ databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ {
+ bool sendData = false;
+ auto actionGapsIt = actionGaps.find(creatorPublicKey);
+ if(actionGapsIt == actionGaps.end())
+ sendData = true;
+ else
+ {
+ for(const auto &userActionGaps : actionGapsIt->second)
+ {
+ if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
+ {
+ sendData = true;
+ break;
+ }
+ }
+ }
+
+ if(!sendData) return;
Value value((u8*)rawData.data, rawData.size);
node.put(requestResponseInfoHash, move(value), [](bool ok)
{
@@ -255,13 +292,39 @@ namespace odhtdb
return true;
});
newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture));
-
seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo;
sibs::SafeSerializer serializer;
- serializer.add((u64)0); // Timestamp in microseconds, fetch data newer than this. // TODO: Get timestamp from database storage
serializer.add(responseKey, OPENDHT_INFOHASH_LEN);
- node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok)
+ bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash());
+ serializer.add(iHaveCreateNode ? (u8)0 : (u8)1);
+ DataViewMap<u64> userLatestActionCounter;
+
+ databaseStorage.fetchNodeUserActionGaps(*nodeToSeed.getRequestHash(), [&serializer, &userLatestActionCounter](const DataView userPublicKey, u64 actionGapStart, u64 actionGapRange)
+ {
+ serializer.add((const u8*)userPublicKey.data, PUBLIC_KEY_NUM_BYTES);
+ serializer.add(actionGapStart);
+ serializer.add(actionGapRange);
+ userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], actionGapStart + actionGapRange);
+ });
+
+ databaseStorage.fetchNodeUserLatestActionCounter(*nodeToSeed.getRequestHash(), [&userLatestActionCounter](const DataView userPublicKey, u64 latestActionCounter)
+ {
+ userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], latestActionCounter);
+ });
+
+ for(auto userLatestActionCounterData : userLatestActionCounter)
+ {
+ // Public key
+ serializer.add((const u8*)userLatestActionCounterData.first.data, PUBLIC_KEY_NUM_BYTES);
+ // Latest action counter start
+ serializer.add(userLatestActionCounterData.second);
+ // Latest action counter range (infinite range, meaning we want all packets older than start (latest known packet by user))
+ serializer.add(~(u64)0ULL - userLatestActionCounterData.second);
+ }
+
+ Value requestValue(move(serializer.getBuffer()));
+ node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok)
{
if(!ok)
Log::warn("Failed to put request to get old data");
@@ -291,14 +354,12 @@ namespace odhtdb
{
shared_ptr<Signature::KeyPair> creatorKeyPair = make_shared<Signature::KeyPair>();
- // TODO: Should this be declared static? is there any difference in behavior/performance?
- boost::uuids::random_generator uuidGen;
auto adminGroupId = uuidGen();
assert(adminGroupId.size() == GROUP_ID_LENGTH);
// Header
sibs::SafeSerializer serializer;
- serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); // Packet structure version
+ serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION);
u64 timestampCombined = getSyncedTimestampUtc().getCombined();
serializer.add(timestampCombined);
serializer.add((u8*)creatorKeyPair->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES);
@@ -353,20 +414,20 @@ namespace odhtdb
u64 timestampCombined = getSyncedTimestampUtc().getCombined();
serializer.add(timestampCombined);
serializer.add(DatabaseOperation::ADD_DATA);
+ u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1;
+ serializer.add(newActionCounter);
DataView encryptionKey(nodeInfo.getNodeEncryptionKey()->data, ENCRYPTION_KEY_BYTE_SIZE);
Encryption encryptedBody(dataToAdd, DataView(), encryptionKey);
- DataView requestData = combine(serializer, encryptedBody);
- string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData);
- DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
+ OwnedMemory requestData = combine(serializer, encryptedBody);
+ string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData.getView());
+ OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
Hash requestDataHash(stagedAddObject.data, stagedAddObject.size);
DataView encryptedDataView((char*)requestData.data + serializer.getBuffer().size(), requestData.size - serializer.getBuffer().size());
- databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
- delete[] (char*)requestData.data;
+ databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(requestDataHash);
Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- delete[] (char*)stagedAddObject.data;
node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
{
// TODO: Handle failure to put data
@@ -391,26 +452,25 @@ namespace odhtdb
u64 timestampCombined = getSyncedTimestampUtc().getCombined();
serializer.add(timestampCombined);
serializer.add(DatabaseOperation::ADD_USER);
+ u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1;
+ serializer.add(newActionCounter);
usize additionalDataOffset = serializer.getBuffer().size();
serializer.add((u8*)userToAddPublicKey.getData(), PUBLIC_KEY_NUM_BYTES);
serializer.add((uint8_t*)groupToAddUserTo.data, groupToAddUserTo.size);
- // TODO: Should this be declared static? is there any difference in behavior/performance?
- boost::uuids::random_generator uuidGen;
auto padding = uuidGen();
assert(padding.size() == 16);
serializer.add(padding.data, padding.size());
DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() };
string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData);
- DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
+ OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData);
Hash requestDataHash(stagedAddObject.data, stagedAddObject.size);
DataView additionalDataView((void*)(static_cast<const char*>(requestData.data) + additionalDataOffset), requestData.size - additionalDataOffset);
- databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
+ databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(requestDataHash);
Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size);
- delete[] (char*)stagedAddObject.data;
node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok)
{
// TODO: Handle failure to put data
@@ -511,8 +571,10 @@ namespace odhtdb
*/
DatabaseOperation operation = deserializerUnsigned.extract<DatabaseOperation>();
+ u64 newActionCounter = deserializerUnsigned.extract<u64>();
+
DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize());
- databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView);
+ databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView);
}
bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedMemory> encryptionKey)
@@ -520,6 +582,8 @@ namespace odhtdb
Log::debug("Got create data");
try
{
+ // This check is here to reduce processing, it doesn't matter much if the packet bypasses this,
+ // the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesNodeExist(hash))
throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)");
deserializeCreateRequest(value, hash, encryptionKey);
@@ -536,10 +600,11 @@ namespace odhtdb
Log::debug("Got add data");
try
{
+ // This check is here to reduce processing, it doesn't matter much if the packet bypasses this,
+ // the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesDataExist(requestDataHash))
throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)");
deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey);
- //Log::debug("Got add object, timestamp: %zu", addObject.timestamp);
}
catch (RequestQuarantineException &e)
{
diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp
index d7f42bb..6064ee5 100644
--- a/src/DatabaseStorage.cpp
+++ b/src/DatabaseStorage.cpp
@@ -6,6 +6,7 @@
#include "../include/odhtdb/Log.hpp"
#include "../include/odhtdb/Database.hpp"
#include "../include/odhtdb/sql/SqlQuery.hpp"
+#include "../include/odhtdb/sql/SqlExec.hpp"
#include <cstring>
#include <chrono>
#include <boost/filesystem/convenience.hpp>
@@ -101,24 +102,26 @@ namespace odhtdb
sqlite_exec_checked(sqliteDb,
"CREATE TABLE IF NOT EXISTS Node(id INTEGER PRIMARY KEY, nodeHash BLOB UNIQUE NOT NULL, timestamp INTEGER NOT NULL, creatorPublicKey BLOB NOT NULL, adminGroupId BLOB NOT NULL);"
- "CREATE TABLE IF NOT EXISTS NodeUser(node BLOB NOT NULL, publicKey BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));"
+ "CREATE TABLE IF NOT EXISTS NodeUser(id INTEGER PRIMARY KEY, node BLOB NOT NULL, publicKey BLOB NOT NULL, latestActionCounter INTEGER NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));"
"CREATE TABLE IF NOT EXISTS NodeGroup(node BLOB NOT NULL, groupId BLOB UNIQUE NOT NULL, permissionLevel INT NOT NULL, permissionFlags INTEGER NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));"
- "CREATE TABLE IF NOT EXISTS NodeAddData(id INTEGER PRIMARY KEY, node BLOB NOT NULL, requestHash BLOB UNIQUE NOT NULL, operation INT NOT NULL, timestamp INTEGER NOT NULL, creatorPublicKey BLOB NOT NULL, decrypted INT NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));"
+ "CREATE TABLE IF NOT EXISTS NodeAddData(id INTEGER PRIMARY KEY, node BLOB NOT NULL, requestHash BLOB UNIQUE NOT NULL, operation INT NOT NULL, timestamp INTEGER NOT NULL, creatorPublicKey BLOB NOT NULL, decrypted INT NOT NULL, userActionCounter INTEGER NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));"
"CREATE TABLE IF NOT EXISTS NodeAddDataAdditional(id INTEGER PRIMARY KEY, nodeAddDataId INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(nodeAddDataId) REFERENCES NodeAddData(id));"
"CREATE TABLE IF NOT EXISTS NodeAddUserData(id INTEGER PRIMARY KEY, nodeAddDataId INTEGER NOT NULL, userToAddPublicKey BLOB NOT NULL, groupId BLOB NOT NULL, FOREIGN KEY(nodeAddDataId) REFERENCES NodeAddData(id));"
"CREATE TABLE IF NOT EXISTS NodeDecryptionKey(node BLOB UNIQUE NOT NULL, decryptionKey BLOB NOT NULL);"
"CREATE TABLE IF NOT EXISTS NodeUserGroupAssoc(node BLOB NOT NULL, userPublicKey BLOB NOT NULL, groupId BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash), FOREIGN KEY(userPublicKey) REFERENCES NodeUser(publicKey), FOREIGN KEY(groupId) REFERENCES NodeGroup(groupId));"
"CREATE TABLE IF NOT EXISTS NodeRaw(node INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(id));"
- "CREATE TABLE IF NOT EXISTS NodeAddDataRaw(node INTEGER NOT NULL, nodeAddData INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(id), FOREIGN KEY(nodeAddData) REFERENCES NodeAddData(id));"
+ "CREATE TABLE IF NOT EXISTS NodeAddDataRaw(nodeId INTEGER NOT NULL, nodeAddDataId INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(nodeId) REFERENCES Node(id), FOREIGN KEY(nodeAddDataId) REFERENCES NodeAddData(id));"
+
+ "CREATE TABLE IF NOT EXISTS NodeUserActionGap(id INTEGER PRIMARY KEY, nodeUserId INTEGER NOT NULL, start INTEGER NOT NULL, range INTEGER NOT NULL, FOREIGN KEY(nodeUserId) REFERENCES NodeUser(id));"
"CREATE UNIQUE INDEX IF NOT EXISTS UniqueUserInNode ON NodeUser(node, publicKey);"
"CREATE UNIQUE INDEX IF NOT EXISTS UniqueUserGroupAssoc ON NodeUserGroupAssoc(node, userPublicKey, groupId);");
sqlite_prepare_checked(sqliteDb, "INSERT INTO Node(nodeHash, timestamp, creatorPublicKey, adminGroupId) VALUES(?, ?, ?, ?)", &insertNodeStmt);
- sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeUser(node, publicKey) VALUES(?, ?)", &insertUserStmt);
+ sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeUser(node, publicKey, latestActionCounter) VALUES(?, ?, 0)", &insertUserStmt);
sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeGroup(node, groupId, permissionLevel, permissionFlags) VALUES(?, ?, ?, ?)", &insertGroupStmt);
- sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddData(node, requestHash, operation, timestamp, creatorPublicKey, decrypted) VALUES(?, ?, ?, ?, ?, ?)", &insertNodeAddDataStmt);
+ sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddData(node, requestHash, operation, timestamp, creatorPublicKey, decrypted, userActionCounter) VALUES(?, ?, ?, ?, ?, ?, ?)", &insertNodeAddDataStmt);
sqlite_prepare_checked(sqliteDb, "INSERT OR REPLACE INTO NodeDecryptionKey(node, decryptionKey) VALUES(?, ?)", &setNodeDecryptionKeyStmt);
sqlite_prepare_checked(sqliteDb, "SELECT decryptionKey FROM NodeDecryptionKey WHERE node = ?", &getNodeDecryptionKeyStmt);
sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeUserGroupAssoc(node, userPublicKey, groupId) VALUES(?, ?, ?)", &insertNodeUserGroupAssocStmt);
@@ -128,7 +131,7 @@ namespace odhtdb
sqlite_prepare_checked(sqliteDb, "SELECT id FROM Node WHERE nodeHash = ?", &selectNodeIdStatement);
sqlite_prepare_checked(sqliteDb, "SELECT id FROM NodeAddData WHERE requestHash = ?", &selectNodeAddDataIdStatement);
sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeRaw(node, data) VALUES(?, ?)", &insertNodeRawStmt);
- sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddDataRaw(node, nodeAddData, data) VALUES(?, ?, ?)", &insertNodeAddDataRawStmt);
+ sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddDataRaw(nodeId, nodeAddDataId, data) VALUES(?, ?, ?)", &insertNodeAddDataRawStmt);
sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddDataAdditional(nodeAddDataId, data) VALUES(?, ?)", &insertNodeAddDataAdditionalStmt);
sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddUserData(nodeAddDataId, userToAddPublicKey, groupId) VALUES(?, ?, ?)", &insertNodeAddUserDataStmt);
@@ -456,9 +459,97 @@ namespace odhtdb
decryptNodeData(hash, nodeDecryptionKeyResult.second, &adminPublicKey, adminGroupId, timestamp);
}
- void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, DatabaseOperation operation, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const void *data, usize size, const DataView &additionalDataView)
+ void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, DatabaseOperation operation, u64 newUserActionCounter, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const void *data, usize size, const DataView &additionalDataView)
{
sqlite3_exec(sqliteDb, "BEGIN", 0, 0, 0);
+
+ {
+ SqlQuery selectUserIdAndActionCounter(sqliteDb, "SELECT id, latestActionCounter FROM NodeUser WHERE node = ? AND publicKey = ?",
+ { DataView(nodeHash.getData(), nodeHash.getSize()), DataView((void*)creatorPublicKey.getData(), creatorPublicKey.getSize()) });
+ if(!selectUserIdAndActionCounter.next())
+ {
+ string errMsg = "User ";
+ errMsg += creatorPublicKey.toString();
+ errMsg += " not found in node ";
+ errMsg += nodeHash.toString();
+ throw DatabaseStorageNotFound(errMsg);
+ }
+
+ i64 nodeUserRowId = selectUserIdAndActionCounter.getInt64(0);
+ u64 userActionCounter = selectUserIdAndActionCounter.getInt64(1);
+
+ if(newUserActionCounter == userActionCounter)
+ {
+ sqlite3_exec(sqliteDb, "ROLLBACK", 0, 0, 0);
+ throw DatabaseStorageException("Got unique package but action counter was equal to users existing one, discarding packet");
+ }
+ else if(newUserActionCounter == userActionCounter + 1)
+ {
+ SqlExec setUserActionCounter(sqliteDb, "UPDATE NodeUser SET latestActionCounter = ? WHERE id = ?");
+ setUserActionCounter.execWithArgs({
+ newUserActionCounter,
+ nodeUserRowId
+ });
+ }
+ else
+ {
+ SqlQuery existingActionGap(sqliteDb, "SELECT id, start, range FROM NodeUserActionGap WHERE nodeUserId = ? AND ? >= start AND ? <= start + range",
+ { nodeUserRowId, newUserActionCounter, newUserActionCounter });
+ if(existingActionGap.next())
+ {
+ i64 actionGapRowId = existingActionGap.getInt64(0);
+ u64 start = existingActionGap.getInt64(1);
+ u64 range = existingActionGap.getInt64(2);
+
+ SqlExec removeRange(sqliteDb, "DELETE FROM NodeUserActionGap WHERE id = ?");
+ removeRange.execWithArgs({ actionGapRowId });
+
+ if(range == 1)
+ {
+ if(start + range > userActionCounter)
+ {
+ SqlExec setUserActionCounter(sqliteDb, "UPDATE NodeUser SET latestActionCounter = ? WHERE id = ?");
+ setUserActionCounter.execWithArgs({
+ start + range,
+ nodeUserRowId
+ });
+ }
+ }
+ else
+ {
+ SqlExec addUserActionGap(sqliteDb, "INSERT INTO NodeUserActionGap(nodeUserId, start, range) VALUES(?, ?, ?)");
+
+ u64 startBefore = start;
+ u64 rangeBefore = newUserActionCounter - start;
+ if(rangeBefore > 0)
+ addUserActionGap.execWithArgs({ nodeUserRowId, startBefore, rangeBefore });
+
+ u64 startAfter = newUserActionCounter + 1;
+ u64 rangeAfter = (start + range) - newUserActionCounter;
+ if(rangeAfter > 0)
+ addUserActionGap.execWithArgs({ nodeUserRowId, startAfter, rangeAfter });
+ }
+ }
+ else
+ {
+ if(newUserActionCounter > userActionCounter + 1)
+ {
+ u64 start = userActionCounter + 1;
+ u64 range = newUserActionCounter - start;
+ SqlExec addUserActionGap(sqliteDb, "INSERT INTO NodeUserActionGap(nodeUserId, start, range) VALUES(?, ?, ?)");
+ addUserActionGap.execWithArgs({ nodeUserRowId, start, range });
+ }
+ else if(newUserActionCounter < userActionCounter)
+ {
+ u64 start = newUserActionCounter;
+ u64 range = userActionCounter - start;
+ SqlExec addUserActionGap(sqliteDb, "INSERT INTO NodeUserActionGap(nodeUserId, start, range) VALUES(?, ?, ?)");
+ addUserActionGap.execWithArgs({ nodeUserRowId, start, range });
+ }
+ }
+ }
+ }
+
{
sqlite3_reset(insertNodeAddDataStmt);
sqlite3_clear_bindings(insertNodeAddDataStmt);
@@ -482,6 +573,9 @@ namespace odhtdb
rc = sqlite3_bind_int(insertNodeAddDataStmt, 6, 0);
bindCheckError(rc);
+ rc = sqlite3_bind_int(insertNodeAddDataStmt, 7, newUserActionCounter);
+ bindCheckError(rc);
+
sqlite_step_rollback_on_failure(sqliteDb, insertNodeAddDataStmt, "insert data into NodeAddData");
}
@@ -537,7 +631,7 @@ namespace odhtdb
else
{
sqlite3_exec(sqliteDb, "ROLLBACK", 0, 0, 0);
- throw std::runtime_error("Unexpected operation type");
+ throw ("Unexpected operation type");
}
{
@@ -633,18 +727,44 @@ namespace odhtdb
SqlQuery query(sqliteDb, "SELECT data FROM NodeRaw WHERE node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) });
while(query.next())
{
- const DataView data = query.getBlob(0);
- callbackFunc(data);
+ const DataView rawData = query.getBlob(0);
+ callbackFunc(rawData);
}
}
void DatabaseStorage::fetchNodeAddDataRaw(const Hash &nodeHash, FetchNodeAddDataRawCallbackFunc callbackFunc)
{
- SqlQuery query(sqliteDb, "SELECT data FROM NodeAddDataRaw WHERE node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) });
+ SqlQuery query(sqliteDb, "SELECT rawData.data, nodeAddData.creatorPublicKey, nodeAddData.userActionCounter From NodeAddData AS nodeAddData INNER JOIN NodeAddDataRaw AS rawData ON rawData.nodeAddDataId = nodeAddData.id WHERE nodeAddData.node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) });
while(query.next())
{
- const DataView data = query.getBlob(0);
- callbackFunc(data);
+ const DataView rawData = query.getBlob(0);
+ const DataView creatorPublicKey = query.getBlob(1);
+ u64 userActionCounter = query.getInt64(2);
+ callbackFunc(rawData, creatorPublicKey, userActionCounter);
+ }
+ }
+
+ void DatabaseStorage::fetchNodeUserActionGaps(const Hash &nodeHash, FetchNodeUserActionGapsCallbackFunc callbackFunc)
+ {
+ SqlQuery query(sqliteDb, "SELECT user.publicKey, actionGap.start, actionGap.range FROM NodeUser AS user INNER JOIN NodeUserActionGap AS actionGap ON actionGap.nodeUserId = user.id WHERE user.node = ?",
+ { DataView(nodeHash.getData(), nodeHash.getSize()) });
+ while(query.next())
+ {
+ const DataView userPublicKey = query.getBlob(0);
+ u64 actionGapStart = query.getInt64(1);
+ u64 actionGapRange = query.getInt64(2);
+ callbackFunc(userPublicKey, actionGapStart, actionGapRange);
+ }
+ }
+
+ void DatabaseStorage::fetchNodeUserLatestActionCounter(const Hash &nodeHash, FetchNodeUserLatestActionCounterCallbackFunc callbackFunc)
+ {
+ SqlQuery query(sqliteDb, "SELECT publicKey, latestActionCounter FROM NodeUser WHERE node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) });
+ while(!query.next())
+ {
+ const DataView userPublicKey = query.getBlob(0);
+ u64 latestActionCounter = query.getInt64(1);
+ callbackFunc(userPublicKey, latestActionCounter);
}
}
@@ -677,6 +797,21 @@ namespace odhtdb
{ DataView(nodeHash.getData(), nodeHash.getSize()), DataView((void*)userPublicKey.getData(), userPublicKey.getSize()), groupToAddPermissionLevel, (i64)PermissionType::ADD_USER_SAME_LEVEL, groupToAddPermissionLevel, (i64)PermissionType::ADD_USER_HIGHER_LEVEL });
return queryCreatorGroupWithRightsToAddUserToGroup.next();
}
+
+ u64 DatabaseStorage::getUserActionCounter(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const
+ {
+ SqlQuery query(sqliteDb, "SELECT latestActionCounter FROM NodeUser WHERE node = ? AND publicKey = ?",
+ { DataView(nodeHash.getData(), nodeHash.getSize()), DataView((void*)userPublicKey.getData(), userPublicKey.getSize()) });
+ if(!query.next())
+ {
+ string errMsg = "User ";
+ errMsg += userPublicKey.toString();
+ errMsg += " not found in node ";
+ errMsg += nodeHash.toString();
+ throw DatabaseStorageNotFound(errMsg);
+ }
+ return query.getInt64(0);
+ }
#if 0
bool DatabaseStorage::storeLocalUser(const string &username, const Signature::KeyPair &keyPair, const string &password)
{
diff --git a/src/sql/Sql.cpp b/src/sql/Sql.cpp
new file mode 100644
index 0000000..754a30d
--- /dev/null
+++ b/src/sql/Sql.cpp
@@ -0,0 +1,21 @@
+#include "../../include/odhtdb/sql/Sql.hpp"
+#include <sqlite3.h>
+
+namespace odhtdb
+{
+ int SqlArg::bind(sqlite3_stmt *stmt, int paramIndex) const
+ {
+ switch(type)
+ {
+ case Type::DATA_VIEW:
+ return sqlite3_bind_blob(stmt, paramIndex, dataView.data, dataView.size, SQLITE_STATIC);
+ case Type::INT:
+ return sqlite3_bind_int(stmt, paramIndex, integer);
+ case Type::INT64:
+ return sqlite3_bind_int64(stmt, paramIndex, integer64);
+ case Type::UINT64: // TODO: Find a way to use u64 in sqlite
+ return sqlite3_bind_int64(stmt, paramIndex, uinteger64);
+ }
+ return SQLITE_OK;
+ }
+}
diff --git a/src/sql/SqlExec.cpp b/src/sql/SqlExec.cpp
new file mode 100644
index 0000000..732b2f1
--- /dev/null
+++ b/src/sql/SqlExec.cpp
@@ -0,0 +1,72 @@
+#include "../../include/odhtdb/sql/SqlExec.hpp"
+#include <sqlite3.h>
+
+namespace odhtdb
+{
+ SqlExec::SqlExec(sqlite3 *_db, const char *sql) :
+ db(_db),
+ stmt(nullptr)
+ {
+ int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, nullptr);
+ if(rc != SQLITE_OK)
+ {
+ std::string errMsg = "Failed to prepare sqlite statement, error: ";
+ errMsg += sqlite3_errmsg(db);
+ sqlite3_exec(db, "ROLLBACK", 0, 0, 0);
+ throw SqlExecException(errMsg);
+ }
+ }
+
+ SqlExec::~SqlExec()
+ {
+ sqlite3_finalize(stmt);
+ }
+
+ void SqlExec::execWithArgs(std::initializer_list<SqlArg> args)
+ {
+ std::lock_guard<std::mutex> lock(mutex);
+
+ sqlite3_reset(stmt);
+ sqlite3_clear_bindings(stmt);
+
+ int numParams = sqlite3_bind_parameter_count(stmt);
+ if(args.size() != numParams)
+ {
+ std::string errMsg = "Failed to prepare sqlite statement, error: Sql has ";
+ errMsg += std::to_string(numParams);
+ errMsg += " parameters, got ";
+ errMsg += std::to_string(args.size());
+ errMsg += " arguments";
+ sqlite3_exec(db, "ROLLBACK", 0, 0, 0);
+ throw SqlExecException(errMsg);
+ }
+
+ int paramIndex = 1;
+ for(const SqlArg &arg : args)
+ {
+ int rc = arg.bind(stmt, paramIndex);
+ if(rc != SQLITE_OK)
+ {
+ std::string errMsg = "Failed to bind param, error code: ";
+ errMsg += std::to_string(rc);
+ sqlite3_exec(db, "ROLLBACK", 0, 0, 0);
+ throw SqlExecException(errMsg);
+ }
+ ++paramIndex;
+ }
+
+ int rc = sqlite3_step(stmt);
+ if(rc != SQLITE_DONE)
+ {
+ std::string errMsg = "Failed to perform sql exec, error: ";
+ errMsg += sqlite3_errmsg(db);
+ sqlite3_exec(db, "ROLLBACK", 0, 0, 0);
+ throw SqlExecException(errMsg);
+ }
+ }
+
+ void SqlExec::exec()
+ {
+ execWithArgs({});
+ }
+}
diff --git a/src/sql/SqlQuery.cpp b/src/sql/SqlQuery.cpp
index 47f1463..b99f92d 100644
--- a/src/sql/SqlQuery.cpp
+++ b/src/sql/SqlQuery.cpp
@@ -3,20 +3,6 @@
namespace odhtdb
{
- int SqlArg::bind(sqlite3_stmt *stmt, int paramIndex) const
- {
- switch(type)
- {
- case Type::DATA_VIEW:
- return sqlite3_bind_blob(stmt, paramIndex, dataView.data, dataView.size, SQLITE_STATIC);
- case Type::INT:
- return sqlite3_bind_int(stmt, paramIndex, integer);
- case Type::INT64:
- return sqlite3_bind_int64(stmt, paramIndex, integer64);
- }
- return SQLITE_OK;
- }
-
SqlQuery::SqlQuery(sqlite3 *_db, const char *sql, std::initializer_list<SqlArg> args) :
db(_db),
stmt(nullptr),