diff options
author | dec05eba <dec05eba@protonmail.com> | 2018-03-13 03:03:11 +0100 |
---|---|---|
committer | dec05eba <dec05eba@protonmail.com> | 2020-08-18 23:25:46 +0200 |
commit | c7740f0e3cbcd9a7233258f22e6168b1cd8853a8 (patch) | |
tree | 21292e24cb7c534d27d8a0600a033977312cfffc /src | |
parent | 6099ec04bd0d98b9e75f5b55b1215c94ccf20202 (diff) |
Fix add data operation not working correctly
Reminder: do not get reference to hash map value... duh
Add thread-safe logging (log is in order now!).
Store data immediately to database when WE add it instead of waiting for
response from remote peers.
TODO: Test with multiple peers (not only localhost)
Diffstat (limited to 'src')
-rw-r--r-- | src/Database.cpp | 75 | ||||
-rw-r--r-- | src/DatabaseStorage.cpp | 31 | ||||
-rw-r--r-- | src/Log.cpp | 40 | ||||
-rw-r--r-- | src/User.cpp | 7 |
4 files changed, 107 insertions, 46 deletions
diff --git a/src/Database.cpp b/src/Database.cpp index fc1a69b..6f53f1d 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -5,6 +5,7 @@ #include "../include/Encryption.hpp" #include "../include/DhtKey.hpp" #include "../include/bin2hex.hpp" +#include "../include/Log.hpp" #include <boost/uuid/uuid_generators.hpp> #include <opendht.h> #include <fmt/format.h> @@ -150,12 +151,12 @@ namespace odhtdb // (only if there are plenty of other seeders for the cached files. This could also cause race issue // where all nodes with a cached file delete it at same time) - printf("Seeding key: %s\n", hash->toString().c_str()); + Log::debug("Seeding key: %s\n", hash->toString().c_str()); DhtKey dhtKey(*hash); node.listen(dhtKey.getNewDataListenerKey(), [this, hash, encryptionKey](const shared_ptr<Value> &value) { - printf("Seed: New data listener received data...\n"); + Log::debug("Seed: New data listener received data...\n"); const Hash requestHash(value->data.data(), value->data.size()); if(requestHash == *hash) return true; @@ -181,7 +182,7 @@ namespace odhtdb // This is to prevent too many peers from responding to a request to get old data. node.listen(dhtKey.getRequestOldDataKey(), [this, hash](const shared_ptr<Value> &value) { - printf("Request: Got request to send old data\n"); + Log::debug("Request: Got request to send old data\n"); try { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); @@ -192,32 +193,34 @@ namespace odhtdb auto requestedData = databaseStorage.getStorage(*hash); if(!requestedData) { - fprintf(stderr, "Warning: No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); + Log::warn("No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); return true; } + InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN); + if(dataStartTimestamp == 0) { - printf("Request: Sent create packet to requesting peer\n"); - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) + Log::debug("Request: Sent create packet to requesting peer\n"); + node.put(requestResponseInfoHash, Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data for 'create' data\n"); + Log::warn("Failed to put response for old data for 'create' data\n"); }); } for(auto requestedObject : requestedData->objects) { - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) + node.put(requestResponseInfoHash, Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data for 'add' data\n"); + Log::warn("Failed to put response for old data for 'add' data\n"); }); } } catch (sibs::DeserializeException &e) { - fprintf(stderr, "Warning: Failed to deserialize 'get old data' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'get old data' request: %s\n", e.what()); } return true; }); @@ -228,7 +231,7 @@ namespace odhtdb node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put request to get old data\n"); + Log::warn("Failed to put request to get old data\n"); }); //node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); @@ -295,7 +298,8 @@ namespace odhtdb string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData); free(requestData.data); DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData); - // TODO: Add add object to database storage here for local user + Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size); stagedAddObjects.emplace_back(make_unique<StagedObject>(stagedAddObject, nodeInfo.getRequestHash())); } @@ -344,24 +348,26 @@ namespace odhtdb DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() }; string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData); DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData); - // TODO: Add add object to database storage here for local user + Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size); + auto userToAdd = RemoteUser::create(userToAddPublicKey, userToAddName, groupToAddUserTo); + databaseStorage.addUser(userToAdd, *nodeInfo.getRequestHash()); stagedAddObjects.emplace_back(make_unique<StagedObject>(stagedAddObject, nodeInfo.getRequestHash())); } void Database::commit() { // TODO: Combine staged objects into one object for efficiency. - // TODO: Add rollback try { - printf("Num objects to create: %zu\n", stagedCreateObjects.size()); + Log::debug("Num objects to create: %zu\n", stagedCreateObjects.size()); for(const auto &stagedObject : stagedCreateObjects) { commitStagedCreateObject(stagedObject); } - printf("Num objects to add: %zu\n", stagedAddObjects.size()); + Log::debug("Num objects to add: %zu\n", stagedAddObjects.size()); for(const auto &stagedObject : stagedAddObjects) { commitStagedAddObject(stagedObject); @@ -369,7 +375,8 @@ namespace odhtdb } catch (exception &e) { - fprintf(stderr, "Error: Failed to commit, reason: %s\n", e.what()); + // TODO: Add rollback + Log::error("Failed to commit, reason: %s\n", e.what()); } for(const auto &stagedObject : stagedCreateObjects) @@ -395,7 +402,7 @@ namespace odhtdb { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); + Log::warn("Failed to put: %s, what to do?\n", "commitStagedCreateObject"); }/* TODO: How to make this work?, time_point(), false*/); } @@ -407,7 +414,7 @@ namespace odhtdb { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject"); + Log::warn("Failed to put: %s, what to do?\n", "commitStagedAddObject"); }/* TODO: How to make this work?, time_point(), false*/); } @@ -483,7 +490,7 @@ namespace odhtdb return false; } - void Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &hash, const shared_ptr<char*> encryptionKey) + void Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &requestDataHash, const shared_ptr<char*> encryptionKey) { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; @@ -518,14 +525,14 @@ namespace odhtdb // The user (public key) could belong to a node but we might not have retrieved the node info yet since data may // not be retrieved in order. // Data in quarantine is processed when 'create' packet is received or removed after 60 seconds - databaseStorage.addToQuarantine(creatorPublicKey, creationDate, value->data.data(), value->data.size()); + databaseStorage.addToQuarantine(requestDataHash, creatorPublicKey, creationDate, value->data.data(), value->data.size()); throw RequestQuarantineException(); } auto creatorUser = databaseStorage.getUserByPublicKey(creatorPublicKey); // TODO: Verify there isn't already data with same timestamp for this node. Same for quarantine. // TODO: We might receive 'add' data packet before 'create'. If that happens, we should put it in quarantine and process it later. - databaseStorage.appendStorage(*node, creatorUser, creationDate, value->data.data(), value->data.size()); + databaseStorage.appendStorage(*node, requestDataHash, creatorUser, creationDate, value->data.data(), value->data.size()); if(operation == DatabaseOperation::ADD_DATA) { @@ -551,7 +558,7 @@ namespace odhtdb throw PermissionDeniedException(errMsg); } - printf("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data); + Log::debug("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data); } else if(operation == DatabaseOperation::ADD_USER) { @@ -572,7 +579,7 @@ namespace odhtdb if(group) { auto user = RemoteUser::create(userToAddPublicKey, name, group); - databaseStorage.addUser(user, hash); + databaseStorage.addUser(user, *node); auto creatorUserGroupWithRights = getGroupWithRightsToAddUserToGroup(creatorUser->getGroups(), group); if(!creatorUserGroupWithRights) @@ -589,7 +596,7 @@ namespace odhtdb throw PermissionDeniedException(errMsg); } - printf("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str()); + Log::debug("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str()); } else { @@ -606,36 +613,36 @@ namespace odhtdb bool Database::listenCreateData(std::shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<char*> encryptionKey) { - printf("Got create data\n"); + Log::debug("Got create data\n"); try { if(databaseStorage.getStorage(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); DatabaseCreateRequest createObject = deserializeCreateRequest(value, hash, encryptionKey); - printf("Got create object, name: %s\n", createObject.name.c_str()); + Log::debug("Got create object, name: %s\n", createObject.name.c_str()); } catch (exception &e) { - fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'create' request: %s\n", e.what()); } return true; } - bool Database::listenAddData(std::shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<char*> encryptionKey) + bool Database::listenAddData(std::shared_ptr<dht::Value> value, const Hash &requestDataHash, const shared_ptr<char*> encryptionKey) { - printf("Got add data\n"); + Log::debug("Got add data\n"); try { - deserializeAddRequest(value, hash, encryptionKey); - //printf("Got add object, timestamp: %zu\n", addObject.timestamp); + deserializeAddRequest(value, requestDataHash, encryptionKey); + //Log::debug("Got add object, timestamp: %zu\n", addObject.timestamp); } catch (RequestQuarantineException &e) { - fprintf(stderr, "Warning: Request was put in quarantine, will be processed later"); + Log::warn("Request was put in quarantine, will be processed later"); } catch (exception &e) { - fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'add' request: %s\n", e.what()); } return true; } diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index 7c86d18..0f75d1f 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -47,33 +47,52 @@ namespace odhtdb } } - void DatabaseStorage::appendStorage(const Hash &hash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize) + void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize) { - auto it = storageMap.find(hash); + auto it = storageMap.find(nodeHash); if(it == storageMap.end()) { string errMsg = "Database storage with hash "; - errMsg += hash.toString(); + errMsg += nodeHash.toString(); errMsg += " not found. Storage for a hash needs to be created before data can be appended to it"; throw DatabaseStorageNotFound(errMsg); } + auto storedDataIt = storedDataHash.find(dataHash); + if(storedDataIt != storedDataHash.end()) + { + string errMsg = "Database already contains data with hash: "; + errMsg += dataHash.toString(); + throw DatabaseStorageAlreadyExists(errMsg); + } + DataView storageData { new u8[dataSize], dataSize }; + memcpy(storageData.data, data, dataSize); DatabaseStorageObject *databaseStorageObject = new DatabaseStorageObject(storageData, timestamp, creatorUser->getPublicKey()); it->second->objects.push_back(databaseStorageObject); + storedDataHash.insert(dataHash); } - void DatabaseStorage::addToQuarantine(const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize) + void DatabaseStorage::addToQuarantine(const Hash &dataHash, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize) { + auto storedDataIt = storedDataHash.find(dataHash); + if(storedDataIt != storedDataHash.end()) + { + string errMsg = "Database already contains data with hash: "; + errMsg += dataHash.toString(); + throw DatabaseStorageAlreadyExists(errMsg); + } + DataView storageData { new u8[dataSize], dataSize }; memcpy(storageData.data, data, dataSize); DatabaseStorageQuarantineObject *databaseQuarantineStorageObject = new DatabaseStorageQuarantineObject(storageData, timestamp, creatorPublicKey); quarantineStorageMap[creatorPublicKey].emplace_back(databaseQuarantineStorageObject); + storedDataHash.insert(dataHash); } void DatabaseStorage::addUser(User *user, const Hash &hash) { - userPublicKeyNodeMap[user->getPublicKey()] = hash; + userPublicKeyNodeMap[user->getPublicKey()] = new Hash(hash); publicKeyUserMap[user->getPublicKey()] = user; } @@ -89,7 +108,7 @@ namespace odhtdb { auto it = userPublicKeyNodeMap.find(userPublicKey); if(it != userPublicKeyNodeMap.end()) - return &it->second; + return it->second; return nullptr; } diff --git a/src/Log.cpp b/src/Log.cpp new file mode 100644 index 0000000..5d77d73 --- /dev/null +++ b/src/Log.cpp @@ -0,0 +1,40 @@ +#include "../include/Log.hpp" +#include <cstdarg> +#include <mutex> + +static std::mutex mutexDebug; + +namespace odhtdb +{ + // TODO: Add color (if output is tty)? + + void Log::debug(const char *fmt, ...) + { + std::lock_guard<std::mutex> lock(mutexDebug); + va_list args; + va_start(args, fmt); + fputs("Debug: ", stdout); + vfprintf(stdout, fmt, args); + va_end(args); + } + + void Log::warn(const char *fmt, ...) + { + std::lock_guard<std::mutex> lock(mutexDebug); + va_list args; + va_start(args, fmt); + fputs("Warning: ", stdout); + vfprintf(stdout, fmt, args); + va_end(args); + } + + void Log::error(const char *fmt, ...) + { + std::lock_guard<std::mutex> lock(mutexDebug); + va_list args; + va_start(args, fmt); + fputs("Error: ", stderr); + vfprintf(stderr, fmt, args); + va_end(args); + } +} diff --git a/src/User.cpp b/src/User.cpp index e2017ff..4ec93cd 100644 --- a/src/User.cpp +++ b/src/User.cpp @@ -7,12 +7,7 @@ namespace odhtdb { if(name.size() > 255) throw UserNameTooLongException(name); - - if(group) - { - groups.emplace_back(group); - group->addUser(this); - } + addToGroup(group); } void User::addToGroup(Group *group) |