From c7740f0e3cbcd9a7233258f22e6168b1cd8853a8 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Tue, 13 Mar 2018 03:03:11 +0100 Subject: Fix add data operation not working correctly Reminder: do not get reference to hash map value... duh Add thread-safe logging (log is in order now!). Store data immediately to database when WE add it instead of waiting for response from remote peers. TODO: Test with multiple peers (not only localhost) --- README.md | 6 ++++ include/Database.hpp | 4 +-- include/DatabaseStorage.hpp | 13 +++++--- include/Hash.hpp | 5 ++- include/Log.hpp | 12 ++++++++ src/Database.cpp | 75 +++++++++++++++++++++++++-------------------- src/DatabaseStorage.cpp | 31 +++++++++++++++---- src/Log.cpp | 40 ++++++++++++++++++++++++ src/User.cpp | 7 +---- tests/main.cpp | 9 +++--- 10 files changed, 144 insertions(+), 58 deletions(-) create mode 100644 include/Log.hpp create mode 100644 src/Log.cpp diff --git a/README.md b/README.md index d5035d5..7a81580 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,15 @@ Only 65kb of data can be used for each `add`. You can add more data by using `ad User and group name can't be longer than 255 characters. # TODO +## Data limit +Allow more than 65kb of data to be added at once ## Node banning Ban nodes that spam put or get (malicious nodes). If data is routed, then the router node should first ban the malicious node so the router node is not banned if it's not malicious. But how can we know if data was routed? does opendht expose this to other nodes in some way? + +Another method could be for nodes to be required to generate a key pair that takes long time to generate and sign messages with them. +If we are spammed with the same key pair, we can ban messages that comes from that key pair. The malicious node would require to generate a new key pair +which would take some time (we can make key generation take 60 seconds). ## Error handling Currently operations are executed without knowing if they succeed or not. Operations should be modified to perhaps return std::future or use a callback function which is called with the operation result. ## Safely store private keys diff --git a/include/Database.hpp b/include/Database.hpp index 64a381c..6bc7bc5 100644 --- a/include/Database.hpp +++ b/include/Database.hpp @@ -141,9 +141,9 @@ namespace odhtdb void commitStagedAddObject(const std::unique_ptr &stagedObject); ntp::NtpTimestamp getSyncedTimestampUtc() const; DatabaseCreateRequest deserializeCreateRequest(const std::shared_ptr &value, const Hash &hash, const std::shared_ptr encryptionKey); - void deserializeAddRequest(const std::shared_ptr &value, const Hash &hash, const std::shared_ptr encryptionKey); + void deserializeAddRequest(const std::shared_ptr &value, const Hash &requestDataHash, const std::shared_ptr encryptionKey); bool listenCreateData(std::shared_ptr value, const Hash &hash, const std::shared_ptr encryptionKey); - bool listenAddData(std::shared_ptr value, const Hash &hash, const std::shared_ptr encryptionKey); + bool listenAddData(std::shared_ptr value, const Hash &requestDataHash, const std::shared_ptr encryptionKey); private: dht::DhtRunner node; std::vector> stagedCreateObjects; diff --git a/include/DatabaseStorage.hpp b/include/DatabaseStorage.hpp index ee4d2ad..ad4f70b 100644 --- a/include/DatabaseStorage.hpp +++ b/include/DatabaseStorage.hpp @@ -52,7 +52,7 @@ namespace odhtdb DatabaseStorageNotFound(const std::string &errMsg) : std::runtime_error(errMsg) {} }; - using DatabaseStorageMap = MapHashKey; + using DatabaseStorageMap = MapHash; using DatabaseStorageQuarantineMap = Signature::MapPublicKey>; class DatabaseStorage @@ -61,10 +61,12 @@ namespace odhtdb // Throws DatabaseStorageAlreadyExists if data with hash already exists void createStorage(const Hash &hash, Group *creatorGroup, u64 timestamp, const u8 *data, usize dataSize); - // Throws DatabaseStorageNotFound if data with hash does not exist - void appendStorage(const Hash &hash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize); + // Throws DatabaseStorageNotFound if data with @nodeHash hash has not been created yet. + // Throws DatabaseStorageAlreadyExists if same data has been added before (hash of @data, in @dataHash) + void appendStorage(const Hash &nodeHash, const Hash &dataHash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize); - void addToQuarantine(const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize); + // Throws DatabaseStorageAlreadyExists if same data has been added before (hash of @data, in @dataHash) + void addToQuarantine(const Hash &dataHash, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize); void addUser(User *user, const Hash &hash); @@ -85,7 +87,8 @@ namespace odhtdb private: DatabaseStorageMap storageMap; DatabaseStorageQuarantineMap quarantineStorageMap; - Signature::MapPublicKey userPublicKeyNodeMap; + SetHash storedDataHash; // Prevent duplicate data from being added + Signature::MapPublicKey userPublicKeyNodeMap; Signature::MapPublicKey publicKeyUserMap; DataViewMap groupByIdMap; }; diff --git a/include/Hash.hpp b/include/Hash.hpp index bd87b69..9dce168 100644 --- a/include/Hash.hpp +++ b/include/Hash.hpp @@ -3,6 +3,7 @@ #include "utils.hpp" #include #include +#include namespace odhtdb { @@ -51,5 +52,7 @@ namespace odhtdb }; template - using MapHashKey = std::unordered_map; + using MapHash = std::unordered_map; + + using SetHash = std::unordered_set; } diff --git a/include/Log.hpp b/include/Log.hpp new file mode 100644 index 0000000..d09c2a2 --- /dev/null +++ b/include/Log.hpp @@ -0,0 +1,12 @@ +#pragma once + +namespace odhtdb +{ + class Log + { + public: + static void debug(const char *fmt, ...); + static void warn(const char *fmt, ...); + static void error(const char *fmt, ...); + }; +} diff --git a/src/Database.cpp b/src/Database.cpp index fc1a69b..6f53f1d 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -5,6 +5,7 @@ #include "../include/Encryption.hpp" #include "../include/DhtKey.hpp" #include "../include/bin2hex.hpp" +#include "../include/Log.hpp" #include #include #include @@ -150,12 +151,12 @@ namespace odhtdb // (only if there are plenty of other seeders for the cached files. This could also cause race issue // where all nodes with a cached file delete it at same time) - printf("Seeding key: %s\n", hash->toString().c_str()); + Log::debug("Seeding key: %s\n", hash->toString().c_str()); DhtKey dhtKey(*hash); node.listen(dhtKey.getNewDataListenerKey(), [this, hash, encryptionKey](const shared_ptr &value) { - printf("Seed: New data listener received data...\n"); + Log::debug("Seed: New data listener received data...\n"); const Hash requestHash(value->data.data(), value->data.size()); if(requestHash == *hash) return true; @@ -181,7 +182,7 @@ namespace odhtdb // This is to prevent too many peers from responding to a request to get old data. node.listen(dhtKey.getRequestOldDataKey(), [this, hash](const shared_ptr &value) { - printf("Request: Got request to send old data\n"); + Log::debug("Request: Got request to send old data\n"); try { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); @@ -192,32 +193,34 @@ namespace odhtdb auto requestedData = databaseStorage.getStorage(*hash); if(!requestedData) { - fprintf(stderr, "Warning: No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); + Log::warn("No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); return true; } + InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN); + if(dataStartTimestamp == 0) { - printf("Request: Sent create packet to requesting peer\n"); - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) + Log::debug("Request: Sent create packet to requesting peer\n"); + node.put(requestResponseInfoHash, Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data for 'create' data\n"); + Log::warn("Failed to put response for old data for 'create' data\n"); }); } for(auto requestedObject : requestedData->objects) { - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) + node.put(requestResponseInfoHash, Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data for 'add' data\n"); + Log::warn("Failed to put response for old data for 'add' data\n"); }); } } catch (sibs::DeserializeException &e) { - fprintf(stderr, "Warning: Failed to deserialize 'get old data' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'get old data' request: %s\n", e.what()); } return true; }); @@ -228,7 +231,7 @@ namespace odhtdb node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put request to get old data\n"); + Log::warn("Failed to put request to get old data\n"); }); //node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); @@ -295,7 +298,8 @@ namespace odhtdb string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData); free(requestData.data); DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData); - // TODO: Add add object to database storage here for local user + Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size); stagedAddObjects.emplace_back(make_unique(stagedAddObject, nodeInfo.getRequestHash())); } @@ -344,24 +348,26 @@ namespace odhtdb DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() }; string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData); DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData); - // TODO: Add add object to database storage here for local user + Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size); + auto userToAdd = RemoteUser::create(userToAddPublicKey, userToAddName, groupToAddUserTo); + databaseStorage.addUser(userToAdd, *nodeInfo.getRequestHash()); stagedAddObjects.emplace_back(make_unique(stagedAddObject, nodeInfo.getRequestHash())); } void Database::commit() { // TODO: Combine staged objects into one object for efficiency. - // TODO: Add rollback try { - printf("Num objects to create: %zu\n", stagedCreateObjects.size()); + Log::debug("Num objects to create: %zu\n", stagedCreateObjects.size()); for(const auto &stagedObject : stagedCreateObjects) { commitStagedCreateObject(stagedObject); } - printf("Num objects to add: %zu\n", stagedAddObjects.size()); + Log::debug("Num objects to add: %zu\n", stagedAddObjects.size()); for(const auto &stagedObject : stagedAddObjects) { commitStagedAddObject(stagedObject); @@ -369,7 +375,8 @@ namespace odhtdb } catch (exception &e) { - fprintf(stderr, "Error: Failed to commit, reason: %s\n", e.what()); + // TODO: Add rollback + Log::error("Failed to commit, reason: %s\n", e.what()); } for(const auto &stagedObject : stagedCreateObjects) @@ -395,7 +402,7 @@ namespace odhtdb { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); + Log::warn("Failed to put: %s, what to do?\n", "commitStagedCreateObject"); }/* TODO: How to make this work?, time_point(), false*/); } @@ -407,7 +414,7 @@ namespace odhtdb { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject"); + Log::warn("Failed to put: %s, what to do?\n", "commitStagedAddObject"); }/* TODO: How to make this work?, time_point(), false*/); } @@ -483,7 +490,7 @@ namespace odhtdb return false; } - void Database::deserializeAddRequest(const std::shared_ptr &value, const Hash &hash, const shared_ptr encryptionKey) + void Database::deserializeAddRequest(const std::shared_ptr &value, const Hash &requestDataHash, const shared_ptr encryptionKey) { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; @@ -518,14 +525,14 @@ namespace odhtdb // The user (public key) could belong to a node but we might not have retrieved the node info yet since data may // not be retrieved in order. // Data in quarantine is processed when 'create' packet is received or removed after 60 seconds - databaseStorage.addToQuarantine(creatorPublicKey, creationDate, value->data.data(), value->data.size()); + databaseStorage.addToQuarantine(requestDataHash, creatorPublicKey, creationDate, value->data.data(), value->data.size()); throw RequestQuarantineException(); } auto creatorUser = databaseStorage.getUserByPublicKey(creatorPublicKey); // TODO: Verify there isn't already data with same timestamp for this node. Same for quarantine. // TODO: We might receive 'add' data packet before 'create'. If that happens, we should put it in quarantine and process it later. - databaseStorage.appendStorage(*node, creatorUser, creationDate, value->data.data(), value->data.size()); + databaseStorage.appendStorage(*node, requestDataHash, creatorUser, creationDate, value->data.data(), value->data.size()); if(operation == DatabaseOperation::ADD_DATA) { @@ -551,7 +558,7 @@ namespace odhtdb throw PermissionDeniedException(errMsg); } - printf("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data); + Log::debug("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data); } else if(operation == DatabaseOperation::ADD_USER) { @@ -572,7 +579,7 @@ namespace odhtdb if(group) { auto user = RemoteUser::create(userToAddPublicKey, name, group); - databaseStorage.addUser(user, hash); + databaseStorage.addUser(user, *node); auto creatorUserGroupWithRights = getGroupWithRightsToAddUserToGroup(creatorUser->getGroups(), group); if(!creatorUserGroupWithRights) @@ -589,7 +596,7 @@ namespace odhtdb throw PermissionDeniedException(errMsg); } - printf("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str()); + Log::debug("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str()); } else { @@ -606,36 +613,36 @@ namespace odhtdb bool Database::listenCreateData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) { - printf("Got create data\n"); + Log::debug("Got create data\n"); try { if(databaseStorage.getStorage(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); DatabaseCreateRequest createObject = deserializeCreateRequest(value, hash, encryptionKey); - printf("Got create object, name: %s\n", createObject.name.c_str()); + Log::debug("Got create object, name: %s\n", createObject.name.c_str()); } catch (exception &e) { - fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'create' request: %s\n", e.what()); } return true; } - bool Database::listenAddData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) + bool Database::listenAddData(std::shared_ptr value, const Hash &requestDataHash, const shared_ptr encryptionKey) { - printf("Got add data\n"); + Log::debug("Got add data\n"); try { - deserializeAddRequest(value, hash, encryptionKey); - //printf("Got add object, timestamp: %zu\n", addObject.timestamp); + deserializeAddRequest(value, requestDataHash, encryptionKey); + //Log::debug("Got add object, timestamp: %zu\n", addObject.timestamp); } catch (RequestQuarantineException &e) { - fprintf(stderr, "Warning: Request was put in quarantine, will be processed later"); + Log::warn("Request was put in quarantine, will be processed later"); } catch (exception &e) { - fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'add' request: %s\n", e.what()); } return true; } diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index 7c86d18..0f75d1f 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -47,33 +47,52 @@ namespace odhtdb } } - void DatabaseStorage::appendStorage(const Hash &hash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize) + void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize) { - auto it = storageMap.find(hash); + auto it = storageMap.find(nodeHash); if(it == storageMap.end()) { string errMsg = "Database storage with hash "; - errMsg += hash.toString(); + errMsg += nodeHash.toString(); errMsg += " not found. Storage for a hash needs to be created before data can be appended to it"; throw DatabaseStorageNotFound(errMsg); } + auto storedDataIt = storedDataHash.find(dataHash); + if(storedDataIt != storedDataHash.end()) + { + string errMsg = "Database already contains data with hash: "; + errMsg += dataHash.toString(); + throw DatabaseStorageAlreadyExists(errMsg); + } + DataView storageData { new u8[dataSize], dataSize }; + memcpy(storageData.data, data, dataSize); DatabaseStorageObject *databaseStorageObject = new DatabaseStorageObject(storageData, timestamp, creatorUser->getPublicKey()); it->second->objects.push_back(databaseStorageObject); + storedDataHash.insert(dataHash); } - void DatabaseStorage::addToQuarantine(const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize) + void DatabaseStorage::addToQuarantine(const Hash &dataHash, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize) { + auto storedDataIt = storedDataHash.find(dataHash); + if(storedDataIt != storedDataHash.end()) + { + string errMsg = "Database already contains data with hash: "; + errMsg += dataHash.toString(); + throw DatabaseStorageAlreadyExists(errMsg); + } + DataView storageData { new u8[dataSize], dataSize }; memcpy(storageData.data, data, dataSize); DatabaseStorageQuarantineObject *databaseQuarantineStorageObject = new DatabaseStorageQuarantineObject(storageData, timestamp, creatorPublicKey); quarantineStorageMap[creatorPublicKey].emplace_back(databaseQuarantineStorageObject); + storedDataHash.insert(dataHash); } void DatabaseStorage::addUser(User *user, const Hash &hash) { - userPublicKeyNodeMap[user->getPublicKey()] = hash; + userPublicKeyNodeMap[user->getPublicKey()] = new Hash(hash); publicKeyUserMap[user->getPublicKey()] = user; } @@ -89,7 +108,7 @@ namespace odhtdb { auto it = userPublicKeyNodeMap.find(userPublicKey); if(it != userPublicKeyNodeMap.end()) - return &it->second; + return it->second; return nullptr; } diff --git a/src/Log.cpp b/src/Log.cpp new file mode 100644 index 0000000..5d77d73 --- /dev/null +++ b/src/Log.cpp @@ -0,0 +1,40 @@ +#include "../include/Log.hpp" +#include +#include + +static std::mutex mutexDebug; + +namespace odhtdb +{ + // TODO: Add color (if output is tty)? + + void Log::debug(const char *fmt, ...) + { + std::lock_guard lock(mutexDebug); + va_list args; + va_start(args, fmt); + fputs("Debug: ", stdout); + vfprintf(stdout, fmt, args); + va_end(args); + } + + void Log::warn(const char *fmt, ...) + { + std::lock_guard lock(mutexDebug); + va_list args; + va_start(args, fmt); + fputs("Warning: ", stdout); + vfprintf(stdout, fmt, args); + va_end(args); + } + + void Log::error(const char *fmt, ...) + { + std::lock_guard lock(mutexDebug); + va_list args; + va_start(args, fmt); + fputs("Error: ", stderr); + vfprintf(stderr, fmt, args); + va_end(args); + } +} diff --git a/src/User.cpp b/src/User.cpp index e2017ff..4ec93cd 100644 --- a/src/User.cpp +++ b/src/User.cpp @@ -7,12 +7,7 @@ namespace odhtdb { if(name.size() > 255) throw UserNameTooLongException(name); - - if(group) - { - groups.emplace_back(group); - group->addUser(this); - } + addToGroup(group); } void User::addToGroup(Group *group) diff --git a/tests/main.cpp b/tests/main.cpp index 4d3bcc3..9992eee 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -1,4 +1,5 @@ #include "assert.hpp" +#include "../include/Log.hpp" #include "../include/Database.hpp" #include "../include/Group.hpp" #include "../include/LocalUser.hpp" @@ -16,16 +17,16 @@ void testHash() { Hash hash("odhtdb", 6); assertEquals("a7b30ec8ab92de60e551b26bb8f78d315697f84dd7f5549a143477e095ec934f", hash.toString()); - printf("hash of 'odhtdb' is: a7b30ec8ab92de60e551b26bb8f78d315697f84dd7f5549a143477e095ec934f\n"); + Log::debug("hash of 'odhtdb' is: a7b30ec8ab92de60e551b26bb8f78d315697f84dd7f5549a143477e095ec934f\n"); } void testSignData(LocalUser *localUser) { std::string publicKeyStr = localUser->getPublicKey().toString(); - printf("Local user public key: %s\n", publicKeyStr.c_str()); + Log::debug("Local user public key: %s\n", publicKeyStr.c_str()); std::string privateKeyStr = localUser->getPrivateKey().toString(); - printf("Local user private key: %s\n", privateKeyStr.c_str()); + Log::debug("Local user private key: %s\n", privateKeyStr.c_str()); string expectedUnsignedData = "hello, world!"; string signedData = localUser->getPrivateKey().sign(DataView((void*)expectedUnsignedData.data(), expectedUnsignedData.size())); @@ -82,7 +83,7 @@ void testEncryption() int main() { - printf("Starting tests...\n"); + Log::debug("Starting tests...\n"); LocalUser *localUser = LocalUser::create(Signature::KeyPair(), "dec05eba", nullptr); testSignData(localUser); testEncryption(); -- cgit v1.2.3