diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Database.cpp | 129 | ||||
-rw-r--r-- | src/DatabaseStorage.cpp | 161 | ||||
-rw-r--r-- | src/sql/Sql.cpp | 21 | ||||
-rw-r--r-- | src/sql/SqlExec.cpp | 72 | ||||
-rw-r--r-- | src/sql/SqlQuery.cpp | 14 |
5 files changed, 338 insertions, 59 deletions
diff --git a/src/Database.cpp b/src/Database.cpp index 9985bb9..fdafab8 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -24,13 +24,13 @@ static time_t timeOffset = 0; // Updated by comparing local time with ntp server static odhtdb::u64 timeOffsetFraction = 0; static thread *ntpThread = nullptr; static bool timestampSynced = false; -static InfoHash CREATE_DATA_HASH = InfoHash::get("__odhtdb__.create_data"); -static InfoHash ADD_DATA_HASH = InfoHash::get("__odhtdb__.add_data"); const int OPENDHT_INFOHASH_LEN = 20; namespace odhtdb { + static boost::uuids::random_generator uuidGen; + const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; @@ -40,23 +40,23 @@ namespace odhtdb RequestQuarantineException() : runtime_error("Request quarantine, will be processed later (can be real of fake request)") {} }; - DataView combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) + OwnedMemory combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) { usize allocationSize = headerSerializer.getBuffer().size() + encryptedData.getNonce().size + encryptedData.getCipherText().size; char *result = new char[allocationSize]; memcpy(result, headerSerializer.getBuffer().data(), headerSerializer.getBuffer().size()); memcpy(result + headerSerializer.getBuffer().size(), encryptedData.getNonce().data, encryptedData.getNonce().size); memcpy(result + headerSerializer.getBuffer().size() + encryptedData.getNonce().size, encryptedData.getCipherText().data, encryptedData.getCipherText().size); - return DataView(result, allocationSize); + return OwnedMemory(result, allocationSize); } - DataView combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData) + OwnedMemory combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData) { usize allocationSize = publicKey.getSize() + signedEncryptedData.size(); char *result = new char[allocationSize]; memcpy(result, publicKey.getData(), publicKey.getSize()); memcpy(result + publicKey.getSize(), signedEncryptedData.data(), signedEncryptedData.size()); - return DataView(result, allocationSize); + return OwnedMemory(result, allocationSize); } DatabaseCreateResponse::DatabaseCreateResponse(std::shared_ptr<Signature::KeyPair> _nodeAdminKeyPair, std::shared_ptr<OwnedMemory> _nodeAdminGroupId, shared_ptr<OwnedMemory> _key, shared_ptr<Hash> _hash) : @@ -163,6 +163,12 @@ namespace odhtdb --databaseCount; node.join(); } + + struct ActionGap + { + u64 start; + u64 range; + }; void Database::seed(const DatabaseNode &nodeToSeed) { @@ -211,7 +217,7 @@ namespace odhtdb }); newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture)); - // TODO: Before listening on this key, we should check how many remote peers are also providing this data. + // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed](const shared_ptr<Value> &value) { @@ -219,12 +225,12 @@ namespace odhtdb try { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - u64 dataStartTimestamp = deserializer.extract<u64>(); - u8 requestResponseKey[OPENDHT_INFOHASH_LEN]; - deserializer.extract(requestResponseKey, OPENDHT_INFOHASH_LEN); - InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN); + InfoHash requestResponseInfoHash; + deserializer.extract(requestResponseInfoHash.data(), OPENDHT_INFOHASH_LEN); + static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?"); + bool userWantsCreateNode = deserializer.extract<u8>() == 1; - if(dataStartTimestamp == 0) + if(userWantsCreateNode) { databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { @@ -238,8 +244,39 @@ namespace odhtdb }); } - databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) + // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants + DataViewMap<vector<ActionGap>> actionGaps; + while(!deserializer.empty()) { + u8 userPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; + deserializer.extract(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + u64 actionGapStart = deserializer.extract<u64>(); + u64 actionGapRange = deserializer.extract<u64>(); + + DataView userPublicKey(userPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); + } + + // TODO(Performance improvement): Instead of sending several packets, combine them into one + databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + { + bool sendData = false; + auto actionGapsIt = actionGaps.find(creatorPublicKey); + if(actionGapsIt == actionGaps.end()) + sendData = true; + else + { + for(const auto &userActionGaps : actionGapsIt->second) + { + if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) + { + sendData = true; + break; + } + } + } + + if(!sendData) return; Value value((u8*)rawData.data, rawData.size); node.put(requestResponseInfoHash, move(value), [](bool ok) { @@ -255,13 +292,39 @@ namespace odhtdb return true; }); newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture)); - seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo; sibs::SafeSerializer serializer; - serializer.add((u64)0); // Timestamp in microseconds, fetch data newer than this. // TODO: Get timestamp from database storage serializer.add(responseKey, OPENDHT_INFOHASH_LEN); - node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok) + bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash()); + serializer.add(iHaveCreateNode ? (u8)0 : (u8)1); + DataViewMap<u64> userLatestActionCounter; + + databaseStorage.fetchNodeUserActionGaps(*nodeToSeed.getRequestHash(), [&serializer, &userLatestActionCounter](const DataView userPublicKey, u64 actionGapStart, u64 actionGapRange) + { + serializer.add((const u8*)userPublicKey.data, PUBLIC_KEY_NUM_BYTES); + serializer.add(actionGapStart); + serializer.add(actionGapRange); + userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], actionGapStart + actionGapRange); + }); + + databaseStorage.fetchNodeUserLatestActionCounter(*nodeToSeed.getRequestHash(), [&userLatestActionCounter](const DataView userPublicKey, u64 latestActionCounter) + { + userLatestActionCounter[userPublicKey] = std::max(userLatestActionCounter[userPublicKey], latestActionCounter); + }); + + for(auto userLatestActionCounterData : userLatestActionCounter) + { + // Public key + serializer.add((const u8*)userLatestActionCounterData.first.data, PUBLIC_KEY_NUM_BYTES); + // Latest action counter start + serializer.add(userLatestActionCounterData.second); + // Latest action counter range (infinite range, meaning we want all packets older than start (latest known packet by user)) + serializer.add(~(u64)0ULL - userLatestActionCounterData.second); + } + + Value requestValue(move(serializer.getBuffer())); + node.put(dhtKey.getRequestOldDataKey(), move(requestValue), [](bool ok) { if(!ok) Log::warn("Failed to put request to get old data"); @@ -291,14 +354,12 @@ namespace odhtdb { shared_ptr<Signature::KeyPair> creatorKeyPair = make_shared<Signature::KeyPair>(); - // TODO: Should this be declared static? is there any difference in behavior/performance? - boost::uuids::random_generator uuidGen; auto adminGroupId = uuidGen(); assert(adminGroupId.size() == GROUP_ID_LENGTH); // Header sibs::SafeSerializer serializer; - serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); // Packet structure version + serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add((u8*)creatorKeyPair->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); @@ -353,20 +414,20 @@ namespace odhtdb u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add(DatabaseOperation::ADD_DATA); + u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1; + serializer.add(newActionCounter); DataView encryptionKey(nodeInfo.getNodeEncryptionKey()->data, ENCRYPTION_KEY_BYTE_SIZE); Encryption encryptedBody(dataToAdd, DataView(), encryptionKey); - DataView requestData = combine(serializer, encryptedBody); - string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData); - DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); + OwnedMemory requestData = combine(serializer, encryptedBody); + string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData.getView()); + OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); DataView encryptedDataView((char*)requestData.data + serializer.getBuffer().size(), requestData.size - serializer.getBuffer().size()); - databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); - delete[] (char*)requestData.data; + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(requestDataHash); Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - delete[] (char*)stagedAddObject.data; node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) { // TODO: Handle failure to put data @@ -391,26 +452,25 @@ namespace odhtdb u64 timestampCombined = getSyncedTimestampUtc().getCombined(); serializer.add(timestampCombined); serializer.add(DatabaseOperation::ADD_USER); + u64 newActionCounter = databaseStorage.getUserActionCounter(*nodeInfo.getRequestHash(), userToPerformActionWith.getPublicKey()) + 1; + serializer.add(newActionCounter); usize additionalDataOffset = serializer.getBuffer().size(); serializer.add((u8*)userToAddPublicKey.getData(), PUBLIC_KEY_NUM_BYTES); serializer.add((uint8_t*)groupToAddUserTo.data, groupToAddUserTo.size); - // TODO: Should this be declared static? is there any difference in behavior/performance? - boost::uuids::random_generator uuidGen; auto padding = uuidGen(); assert(padding.size() == 16); serializer.add(padding.data, padding.size()); DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() }; string signedRequestData = userToPerformActionWith.getPrivateKey().sign(requestData); - DataView stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); + OwnedMemory stagedAddObject = combine(userToPerformActionWith.getPublicKey(), signedRequestData); Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); DataView additionalDataView((void*)(static_cast<const char*>(requestData.data) + additionalDataOffset), requestData.size - additionalDataOffset); - databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(requestDataHash); Value addDataValue((u8*)stagedAddObject.data, stagedAddObject.size); - delete[] (char*)stagedAddObject.data; node.put(dhtKey.getNewDataListenerKey(), move(addDataValue), [](bool ok) { // TODO: Handle failure to put data @@ -511,8 +571,10 @@ namespace odhtdb */ DatabaseOperation operation = deserializerUnsigned.extract<DatabaseOperation>(); + u64 newActionCounter = deserializerUnsigned.extract<u64>(); + DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); - databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); + databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); } bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedMemory> encryptionKey) @@ -520,6 +582,8 @@ namespace odhtdb Log::debug("Got create data"); try { + // This check is here to reduce processing, it doesn't matter much if the packet bypasses this, + // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesNodeExist(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); deserializeCreateRequest(value, hash, encryptionKey); @@ -536,10 +600,11 @@ namespace odhtdb Log::debug("Got add data"); try { + // This check is here to reduce processing, it doesn't matter much if the packet bypasses this, + // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesDataExist(requestDataHash)) throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)"); deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey); - //Log::debug("Got add object, timestamp: %zu", addObject.timestamp); } catch (RequestQuarantineException &e) { diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index d7f42bb..6064ee5 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -6,6 +6,7 @@ #include "../include/odhtdb/Log.hpp" #include "../include/odhtdb/Database.hpp" #include "../include/odhtdb/sql/SqlQuery.hpp" +#include "../include/odhtdb/sql/SqlExec.hpp" #include <cstring> #include <chrono> #include <boost/filesystem/convenience.hpp> @@ -101,24 +102,26 @@ namespace odhtdb sqlite_exec_checked(sqliteDb, "CREATE TABLE IF NOT EXISTS Node(id INTEGER PRIMARY KEY, nodeHash BLOB UNIQUE NOT NULL, timestamp INTEGER NOT NULL, creatorPublicKey BLOB NOT NULL, adminGroupId BLOB NOT NULL);" - "CREATE TABLE IF NOT EXISTS NodeUser(node BLOB NOT NULL, publicKey BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));" + "CREATE TABLE IF NOT EXISTS NodeUser(id INTEGER PRIMARY KEY, node BLOB NOT NULL, publicKey BLOB NOT NULL, latestActionCounter INTEGER NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));" "CREATE TABLE IF NOT EXISTS NodeGroup(node BLOB NOT NULL, groupId BLOB UNIQUE NOT NULL, permissionLevel INT NOT NULL, permissionFlags INTEGER NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));" - "CREATE TABLE IF NOT EXISTS NodeAddData(id INTEGER PRIMARY KEY, node BLOB NOT NULL, requestHash BLOB UNIQUE NOT NULL, operation INT NOT NULL, timestamp INTEGER NOT NULL, creatorPublicKey BLOB NOT NULL, decrypted INT NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));" + "CREATE TABLE IF NOT EXISTS NodeAddData(id INTEGER PRIMARY KEY, node BLOB NOT NULL, requestHash BLOB UNIQUE NOT NULL, operation INT NOT NULL, timestamp INTEGER NOT NULL, creatorPublicKey BLOB NOT NULL, decrypted INT NOT NULL, userActionCounter INTEGER NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash));" "CREATE TABLE IF NOT EXISTS NodeAddDataAdditional(id INTEGER PRIMARY KEY, nodeAddDataId INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(nodeAddDataId) REFERENCES NodeAddData(id));" "CREATE TABLE IF NOT EXISTS NodeAddUserData(id INTEGER PRIMARY KEY, nodeAddDataId INTEGER NOT NULL, userToAddPublicKey BLOB NOT NULL, groupId BLOB NOT NULL, FOREIGN KEY(nodeAddDataId) REFERENCES NodeAddData(id));" "CREATE TABLE IF NOT EXISTS NodeDecryptionKey(node BLOB UNIQUE NOT NULL, decryptionKey BLOB NOT NULL);" "CREATE TABLE IF NOT EXISTS NodeUserGroupAssoc(node BLOB NOT NULL, userPublicKey BLOB NOT NULL, groupId BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(nodeHash), FOREIGN KEY(userPublicKey) REFERENCES NodeUser(publicKey), FOREIGN KEY(groupId) REFERENCES NodeGroup(groupId));" "CREATE TABLE IF NOT EXISTS NodeRaw(node INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(id));" - "CREATE TABLE IF NOT EXISTS NodeAddDataRaw(node INTEGER NOT NULL, nodeAddData INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(node) REFERENCES Node(id), FOREIGN KEY(nodeAddData) REFERENCES NodeAddData(id));" + "CREATE TABLE IF NOT EXISTS NodeAddDataRaw(nodeId INTEGER NOT NULL, nodeAddDataId INTEGER NOT NULL, data BLOB NOT NULL, FOREIGN KEY(nodeId) REFERENCES Node(id), FOREIGN KEY(nodeAddDataId) REFERENCES NodeAddData(id));" + + "CREATE TABLE IF NOT EXISTS NodeUserActionGap(id INTEGER PRIMARY KEY, nodeUserId INTEGER NOT NULL, start INTEGER NOT NULL, range INTEGER NOT NULL, FOREIGN KEY(nodeUserId) REFERENCES NodeUser(id));" "CREATE UNIQUE INDEX IF NOT EXISTS UniqueUserInNode ON NodeUser(node, publicKey);" "CREATE UNIQUE INDEX IF NOT EXISTS UniqueUserGroupAssoc ON NodeUserGroupAssoc(node, userPublicKey, groupId);"); sqlite_prepare_checked(sqliteDb, "INSERT INTO Node(nodeHash, timestamp, creatorPublicKey, adminGroupId) VALUES(?, ?, ?, ?)", &insertNodeStmt); - sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeUser(node, publicKey) VALUES(?, ?)", &insertUserStmt); + sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeUser(node, publicKey, latestActionCounter) VALUES(?, ?, 0)", &insertUserStmt); sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeGroup(node, groupId, permissionLevel, permissionFlags) VALUES(?, ?, ?, ?)", &insertGroupStmt); - sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddData(node, requestHash, operation, timestamp, creatorPublicKey, decrypted) VALUES(?, ?, ?, ?, ?, ?)", &insertNodeAddDataStmt); + sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddData(node, requestHash, operation, timestamp, creatorPublicKey, decrypted, userActionCounter) VALUES(?, ?, ?, ?, ?, ?, ?)", &insertNodeAddDataStmt); sqlite_prepare_checked(sqliteDb, "INSERT OR REPLACE INTO NodeDecryptionKey(node, decryptionKey) VALUES(?, ?)", &setNodeDecryptionKeyStmt); sqlite_prepare_checked(sqliteDb, "SELECT decryptionKey FROM NodeDecryptionKey WHERE node = ?", &getNodeDecryptionKeyStmt); sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeUserGroupAssoc(node, userPublicKey, groupId) VALUES(?, ?, ?)", &insertNodeUserGroupAssocStmt); @@ -128,7 +131,7 @@ namespace odhtdb sqlite_prepare_checked(sqliteDb, "SELECT id FROM Node WHERE nodeHash = ?", &selectNodeIdStatement); sqlite_prepare_checked(sqliteDb, "SELECT id FROM NodeAddData WHERE requestHash = ?", &selectNodeAddDataIdStatement); sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeRaw(node, data) VALUES(?, ?)", &insertNodeRawStmt); - sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddDataRaw(node, nodeAddData, data) VALUES(?, ?, ?)", &insertNodeAddDataRawStmt); + sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddDataRaw(nodeId, nodeAddDataId, data) VALUES(?, ?, ?)", &insertNodeAddDataRawStmt); sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddDataAdditional(nodeAddDataId, data) VALUES(?, ?)", &insertNodeAddDataAdditionalStmt); sqlite_prepare_checked(sqliteDb, "INSERT INTO NodeAddUserData(nodeAddDataId, userToAddPublicKey, groupId) VALUES(?, ?, ?)", &insertNodeAddUserDataStmt); @@ -456,9 +459,97 @@ namespace odhtdb decryptNodeData(hash, nodeDecryptionKeyResult.second, &adminPublicKey, adminGroupId, timestamp); } - void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, DatabaseOperation operation, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const void *data, usize size, const DataView &additionalDataView) + void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, DatabaseOperation operation, u64 newUserActionCounter, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const void *data, usize size, const DataView &additionalDataView) { sqlite3_exec(sqliteDb, "BEGIN", 0, 0, 0); + + { + SqlQuery selectUserIdAndActionCounter(sqliteDb, "SELECT id, latestActionCounter FROM NodeUser WHERE node = ? AND publicKey = ?", + { DataView(nodeHash.getData(), nodeHash.getSize()), DataView((void*)creatorPublicKey.getData(), creatorPublicKey.getSize()) }); + if(!selectUserIdAndActionCounter.next()) + { + string errMsg = "User "; + errMsg += creatorPublicKey.toString(); + errMsg += " not found in node "; + errMsg += nodeHash.toString(); + throw DatabaseStorageNotFound(errMsg); + } + + i64 nodeUserRowId = selectUserIdAndActionCounter.getInt64(0); + u64 userActionCounter = selectUserIdAndActionCounter.getInt64(1); + + if(newUserActionCounter == userActionCounter) + { + sqlite3_exec(sqliteDb, "ROLLBACK", 0, 0, 0); + throw DatabaseStorageException("Got unique package but action counter was equal to users existing one, discarding packet"); + } + else if(newUserActionCounter == userActionCounter + 1) + { + SqlExec setUserActionCounter(sqliteDb, "UPDATE NodeUser SET latestActionCounter = ? WHERE id = ?"); + setUserActionCounter.execWithArgs({ + newUserActionCounter, + nodeUserRowId + }); + } + else + { + SqlQuery existingActionGap(sqliteDb, "SELECT id, start, range FROM NodeUserActionGap WHERE nodeUserId = ? AND ? >= start AND ? <= start + range", + { nodeUserRowId, newUserActionCounter, newUserActionCounter }); + if(existingActionGap.next()) + { + i64 actionGapRowId = existingActionGap.getInt64(0); + u64 start = existingActionGap.getInt64(1); + u64 range = existingActionGap.getInt64(2); + + SqlExec removeRange(sqliteDb, "DELETE FROM NodeUserActionGap WHERE id = ?"); + removeRange.execWithArgs({ actionGapRowId }); + + if(range == 1) + { + if(start + range > userActionCounter) + { + SqlExec setUserActionCounter(sqliteDb, "UPDATE NodeUser SET latestActionCounter = ? WHERE id = ?"); + setUserActionCounter.execWithArgs({ + start + range, + nodeUserRowId + }); + } + } + else + { + SqlExec addUserActionGap(sqliteDb, "INSERT INTO NodeUserActionGap(nodeUserId, start, range) VALUES(?, ?, ?)"); + + u64 startBefore = start; + u64 rangeBefore = newUserActionCounter - start; + if(rangeBefore > 0) + addUserActionGap.execWithArgs({ nodeUserRowId, startBefore, rangeBefore }); + + u64 startAfter = newUserActionCounter + 1; + u64 rangeAfter = (start + range) - newUserActionCounter; + if(rangeAfter > 0) + addUserActionGap.execWithArgs({ nodeUserRowId, startAfter, rangeAfter }); + } + } + else + { + if(newUserActionCounter > userActionCounter + 1) + { + u64 start = userActionCounter + 1; + u64 range = newUserActionCounter - start; + SqlExec addUserActionGap(sqliteDb, "INSERT INTO NodeUserActionGap(nodeUserId, start, range) VALUES(?, ?, ?)"); + addUserActionGap.execWithArgs({ nodeUserRowId, start, range }); + } + else if(newUserActionCounter < userActionCounter) + { + u64 start = newUserActionCounter; + u64 range = userActionCounter - start; + SqlExec addUserActionGap(sqliteDb, "INSERT INTO NodeUserActionGap(nodeUserId, start, range) VALUES(?, ?, ?)"); + addUserActionGap.execWithArgs({ nodeUserRowId, start, range }); + } + } + } + } + { sqlite3_reset(insertNodeAddDataStmt); sqlite3_clear_bindings(insertNodeAddDataStmt); @@ -482,6 +573,9 @@ namespace odhtdb rc = sqlite3_bind_int(insertNodeAddDataStmt, 6, 0); bindCheckError(rc); + rc = sqlite3_bind_int(insertNodeAddDataStmt, 7, newUserActionCounter); + bindCheckError(rc); + sqlite_step_rollback_on_failure(sqliteDb, insertNodeAddDataStmt, "insert data into NodeAddData"); } @@ -537,7 +631,7 @@ namespace odhtdb else { sqlite3_exec(sqliteDb, "ROLLBACK", 0, 0, 0); - throw std::runtime_error("Unexpected operation type"); + throw ("Unexpected operation type"); } { @@ -633,18 +727,44 @@ namespace odhtdb SqlQuery query(sqliteDb, "SELECT data FROM NodeRaw WHERE node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) }); while(query.next()) { - const DataView data = query.getBlob(0); - callbackFunc(data); + const DataView rawData = query.getBlob(0); + callbackFunc(rawData); } } void DatabaseStorage::fetchNodeAddDataRaw(const Hash &nodeHash, FetchNodeAddDataRawCallbackFunc callbackFunc) { - SqlQuery query(sqliteDb, "SELECT data FROM NodeAddDataRaw WHERE node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) }); + SqlQuery query(sqliteDb, "SELECT rawData.data, nodeAddData.creatorPublicKey, nodeAddData.userActionCounter From NodeAddData AS nodeAddData INNER JOIN NodeAddDataRaw AS rawData ON rawData.nodeAddDataId = nodeAddData.id WHERE nodeAddData.node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) }); while(query.next()) { - const DataView data = query.getBlob(0); - callbackFunc(data); + const DataView rawData = query.getBlob(0); + const DataView creatorPublicKey = query.getBlob(1); + u64 userActionCounter = query.getInt64(2); + callbackFunc(rawData, creatorPublicKey, userActionCounter); + } + } + + void DatabaseStorage::fetchNodeUserActionGaps(const Hash &nodeHash, FetchNodeUserActionGapsCallbackFunc callbackFunc) + { + SqlQuery query(sqliteDb, "SELECT user.publicKey, actionGap.start, actionGap.range FROM NodeUser AS user INNER JOIN NodeUserActionGap AS actionGap ON actionGap.nodeUserId = user.id WHERE user.node = ?", + { DataView(nodeHash.getData(), nodeHash.getSize()) }); + while(query.next()) + { + const DataView userPublicKey = query.getBlob(0); + u64 actionGapStart = query.getInt64(1); + u64 actionGapRange = query.getInt64(2); + callbackFunc(userPublicKey, actionGapStart, actionGapRange); + } + } + + void DatabaseStorage::fetchNodeUserLatestActionCounter(const Hash &nodeHash, FetchNodeUserLatestActionCounterCallbackFunc callbackFunc) + { + SqlQuery query(sqliteDb, "SELECT publicKey, latestActionCounter FROM NodeUser WHERE node = ?", { DataView(nodeHash.getData(), nodeHash.getSize()) }); + while(!query.next()) + { + const DataView userPublicKey = query.getBlob(0); + u64 latestActionCounter = query.getInt64(1); + callbackFunc(userPublicKey, latestActionCounter); } } @@ -677,6 +797,21 @@ namespace odhtdb { DataView(nodeHash.getData(), nodeHash.getSize()), DataView((void*)userPublicKey.getData(), userPublicKey.getSize()), groupToAddPermissionLevel, (i64)PermissionType::ADD_USER_SAME_LEVEL, groupToAddPermissionLevel, (i64)PermissionType::ADD_USER_HIGHER_LEVEL }); return queryCreatorGroupWithRightsToAddUserToGroup.next(); } + + u64 DatabaseStorage::getUserActionCounter(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const + { + SqlQuery query(sqliteDb, "SELECT latestActionCounter FROM NodeUser WHERE node = ? AND publicKey = ?", + { DataView(nodeHash.getData(), nodeHash.getSize()), DataView((void*)userPublicKey.getData(), userPublicKey.getSize()) }); + if(!query.next()) + { + string errMsg = "User "; + errMsg += userPublicKey.toString(); + errMsg += " not found in node "; + errMsg += nodeHash.toString(); + throw DatabaseStorageNotFound(errMsg); + } + return query.getInt64(0); + } #if 0 bool DatabaseStorage::storeLocalUser(const string &username, const Signature::KeyPair &keyPair, const string &password) { diff --git a/src/sql/Sql.cpp b/src/sql/Sql.cpp new file mode 100644 index 0000000..754a30d --- /dev/null +++ b/src/sql/Sql.cpp @@ -0,0 +1,21 @@ +#include "../../include/odhtdb/sql/Sql.hpp" +#include <sqlite3.h> + +namespace odhtdb +{ + int SqlArg::bind(sqlite3_stmt *stmt, int paramIndex) const + { + switch(type) + { + case Type::DATA_VIEW: + return sqlite3_bind_blob(stmt, paramIndex, dataView.data, dataView.size, SQLITE_STATIC); + case Type::INT: + return sqlite3_bind_int(stmt, paramIndex, integer); + case Type::INT64: + return sqlite3_bind_int64(stmt, paramIndex, integer64); + case Type::UINT64: // TODO: Find a way to use u64 in sqlite + return sqlite3_bind_int64(stmt, paramIndex, uinteger64); + } + return SQLITE_OK; + } +} diff --git a/src/sql/SqlExec.cpp b/src/sql/SqlExec.cpp new file mode 100644 index 0000000..732b2f1 --- /dev/null +++ b/src/sql/SqlExec.cpp @@ -0,0 +1,72 @@ +#include "../../include/odhtdb/sql/SqlExec.hpp" +#include <sqlite3.h> + +namespace odhtdb +{ + SqlExec::SqlExec(sqlite3 *_db, const char *sql) : + db(_db), + stmt(nullptr) + { + int rc = sqlite3_prepare_v2(db, sql, -1, &stmt, nullptr); + if(rc != SQLITE_OK) + { + std::string errMsg = "Failed to prepare sqlite statement, error: "; + errMsg += sqlite3_errmsg(db); + sqlite3_exec(db, "ROLLBACK", 0, 0, 0); + throw SqlExecException(errMsg); + } + } + + SqlExec::~SqlExec() + { + sqlite3_finalize(stmt); + } + + void SqlExec::execWithArgs(std::initializer_list<SqlArg> args) + { + std::lock_guard<std::mutex> lock(mutex); + + sqlite3_reset(stmt); + sqlite3_clear_bindings(stmt); + + int numParams = sqlite3_bind_parameter_count(stmt); + if(args.size() != numParams) + { + std::string errMsg = "Failed to prepare sqlite statement, error: Sql has "; + errMsg += std::to_string(numParams); + errMsg += " parameters, got "; + errMsg += std::to_string(args.size()); + errMsg += " arguments"; + sqlite3_exec(db, "ROLLBACK", 0, 0, 0); + throw SqlExecException(errMsg); + } + + int paramIndex = 1; + for(const SqlArg &arg : args) + { + int rc = arg.bind(stmt, paramIndex); + if(rc != SQLITE_OK) + { + std::string errMsg = "Failed to bind param, error code: "; + errMsg += std::to_string(rc); + sqlite3_exec(db, "ROLLBACK", 0, 0, 0); + throw SqlExecException(errMsg); + } + ++paramIndex; + } + + int rc = sqlite3_step(stmt); + if(rc != SQLITE_DONE) + { + std::string errMsg = "Failed to perform sql exec, error: "; + errMsg += sqlite3_errmsg(db); + sqlite3_exec(db, "ROLLBACK", 0, 0, 0); + throw SqlExecException(errMsg); + } + } + + void SqlExec::exec() + { + execWithArgs({}); + } +} diff --git a/src/sql/SqlQuery.cpp b/src/sql/SqlQuery.cpp index 47f1463..b99f92d 100644 --- a/src/sql/SqlQuery.cpp +++ b/src/sql/SqlQuery.cpp @@ -3,20 +3,6 @@ namespace odhtdb { - int SqlArg::bind(sqlite3_stmt *stmt, int paramIndex) const - { - switch(type) - { - case Type::DATA_VIEW: - return sqlite3_bind_blob(stmt, paramIndex, dataView.data, dataView.size, SQLITE_STATIC); - case Type::INT: - return sqlite3_bind_int(stmt, paramIndex, integer); - case Type::INT64: - return sqlite3_bind_int64(stmt, paramIndex, integer64); - } - return SQLITE_OK; - } - SqlQuery::SqlQuery(sqlite3 *_db, const char *sql, std::initializer_list<SqlArg> args) : db(_db), stmt(nullptr), |