From c7740f0e3cbcd9a7233258f22e6168b1cd8853a8 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Tue, 13 Mar 2018 03:03:11 +0100 Subject: Fix add data operation not working correctly Reminder: do not get reference to hash map value... duh Add thread-safe logging (log is in order now!). Store data immediately to database when WE add it instead of waiting for response from remote peers. TODO: Test with multiple peers (not only localhost) --- src/Database.cpp | 75 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 41 insertions(+), 34 deletions(-) (limited to 'src/Database.cpp') diff --git a/src/Database.cpp b/src/Database.cpp index fc1a69b..6f53f1d 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -5,6 +5,7 @@ #include "../include/Encryption.hpp" #include "../include/DhtKey.hpp" #include "../include/bin2hex.hpp" +#include "../include/Log.hpp" #include #include #include @@ -150,12 +151,12 @@ namespace odhtdb // (only if there are plenty of other seeders for the cached files. This could also cause race issue // where all nodes with a cached file delete it at same time) - printf("Seeding key: %s\n", hash->toString().c_str()); + Log::debug("Seeding key: %s\n", hash->toString().c_str()); DhtKey dhtKey(*hash); node.listen(dhtKey.getNewDataListenerKey(), [this, hash, encryptionKey](const shared_ptr &value) { - printf("Seed: New data listener received data...\n"); + Log::debug("Seed: New data listener received data...\n"); const Hash requestHash(value->data.data(), value->data.size()); if(requestHash == *hash) return true; @@ -181,7 +182,7 @@ namespace odhtdb // This is to prevent too many peers from responding to a request to get old data. node.listen(dhtKey.getRequestOldDataKey(), [this, hash](const shared_ptr &value) { - printf("Request: Got request to send old data\n"); + Log::debug("Request: Got request to send old data\n"); try { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); @@ -192,32 +193,34 @@ namespace odhtdb auto requestedData = databaseStorage.getStorage(*hash); if(!requestedData) { - fprintf(stderr, "Warning: No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); + Log::warn("No data found for hash %s, unable to serve peer\n", hash->toString().c_str()); return true; } + InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN); + if(dataStartTimestamp == 0) { - printf("Request: Sent create packet to requesting peer\n"); - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) + Log::debug("Request: Sent create packet to requesting peer\n"); + node.put(requestResponseInfoHash, Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data for 'create' data\n"); + Log::warn("Failed to put response for old data for 'create' data\n"); }); } for(auto requestedObject : requestedData->objects) { - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) + node.put(requestResponseInfoHash, Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data for 'add' data\n"); + Log::warn("Failed to put response for old data for 'add' data\n"); }); } } catch (sibs::DeserializeException &e) { - fprintf(stderr, "Warning: Failed to deserialize 'get old data' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'get old data' request: %s\n", e.what()); } return true; }); @@ -228,7 +231,7 @@ namespace odhtdb node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put request to get old data\n"); + Log::warn("Failed to put request to get old data\n"); }); //node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1)); @@ -295,7 +298,8 @@ namespace odhtdb string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData); free(requestData.data); DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData); - // TODO: Add add object to database storage here for local user + Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size); stagedAddObjects.emplace_back(make_unique(stagedAddObject, nodeInfo.getRequestHash())); } @@ -344,24 +348,26 @@ namespace odhtdb DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() }; string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData); DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData); - // TODO: Add add object to database storage here for local user + Hash requestDataHash(stagedAddObject.data, stagedAddObject.size); + databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size); + auto userToAdd = RemoteUser::create(userToAddPublicKey, userToAddName, groupToAddUserTo); + databaseStorage.addUser(userToAdd, *nodeInfo.getRequestHash()); stagedAddObjects.emplace_back(make_unique(stagedAddObject, nodeInfo.getRequestHash())); } void Database::commit() { // TODO: Combine staged objects into one object for efficiency. - // TODO: Add rollback try { - printf("Num objects to create: %zu\n", stagedCreateObjects.size()); + Log::debug("Num objects to create: %zu\n", stagedCreateObjects.size()); for(const auto &stagedObject : stagedCreateObjects) { commitStagedCreateObject(stagedObject); } - printf("Num objects to add: %zu\n", stagedAddObjects.size()); + Log::debug("Num objects to add: %zu\n", stagedAddObjects.size()); for(const auto &stagedObject : stagedAddObjects) { commitStagedAddObject(stagedObject); @@ -369,7 +375,8 @@ namespace odhtdb } catch (exception &e) { - fprintf(stderr, "Error: Failed to commit, reason: %s\n", e.what()); + // TODO: Add rollback + Log::error("Failed to commit, reason: %s\n", e.what()); } for(const auto &stagedObject : stagedCreateObjects) @@ -395,7 +402,7 @@ namespace odhtdb { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject"); + Log::warn("Failed to put: %s, what to do?\n", "commitStagedCreateObject"); }/* TODO: How to make this work?, time_point(), false*/); } @@ -407,7 +414,7 @@ namespace odhtdb { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject"); + Log::warn("Failed to put: %s, what to do?\n", "commitStagedAddObject"); }/* TODO: How to make this work?, time_point(), false*/); } @@ -483,7 +490,7 @@ namespace odhtdb return false; } - void Database::deserializeAddRequest(const std::shared_ptr &value, const Hash &hash, const shared_ptr encryptionKey) + void Database::deserializeAddRequest(const std::shared_ptr &value, const Hash &requestDataHash, const shared_ptr encryptionKey) { sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; @@ -518,14 +525,14 @@ namespace odhtdb // The user (public key) could belong to a node but we might not have retrieved the node info yet since data may // not be retrieved in order. // Data in quarantine is processed when 'create' packet is received or removed after 60 seconds - databaseStorage.addToQuarantine(creatorPublicKey, creationDate, value->data.data(), value->data.size()); + databaseStorage.addToQuarantine(requestDataHash, creatorPublicKey, creationDate, value->data.data(), value->data.size()); throw RequestQuarantineException(); } auto creatorUser = databaseStorage.getUserByPublicKey(creatorPublicKey); // TODO: Verify there isn't already data with same timestamp for this node. Same for quarantine. // TODO: We might receive 'add' data packet before 'create'. If that happens, we should put it in quarantine and process it later. - databaseStorage.appendStorage(*node, creatorUser, creationDate, value->data.data(), value->data.size()); + databaseStorage.appendStorage(*node, requestDataHash, creatorUser, creationDate, value->data.data(), value->data.size()); if(operation == DatabaseOperation::ADD_DATA) { @@ -551,7 +558,7 @@ namespace odhtdb throw PermissionDeniedException(errMsg); } - printf("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data); + Log::debug("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data); } else if(operation == DatabaseOperation::ADD_USER) { @@ -572,7 +579,7 @@ namespace odhtdb if(group) { auto user = RemoteUser::create(userToAddPublicKey, name, group); - databaseStorage.addUser(user, hash); + databaseStorage.addUser(user, *node); auto creatorUserGroupWithRights = getGroupWithRightsToAddUserToGroup(creatorUser->getGroups(), group); if(!creatorUserGroupWithRights) @@ -589,7 +596,7 @@ namespace odhtdb throw PermissionDeniedException(errMsg); } - printf("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str()); + Log::debug("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str()); } else { @@ -606,36 +613,36 @@ namespace odhtdb bool Database::listenCreateData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) { - printf("Got create data\n"); + Log::debug("Got create data\n"); try { if(databaseStorage.getStorage(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); DatabaseCreateRequest createObject = deserializeCreateRequest(value, hash, encryptionKey); - printf("Got create object, name: %s\n", createObject.name.c_str()); + Log::debug("Got create object, name: %s\n", createObject.name.c_str()); } catch (exception &e) { - fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'create' request: %s\n", e.what()); } return true; } - bool Database::listenAddData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) + bool Database::listenAddData(std::shared_ptr value, const Hash &requestDataHash, const shared_ptr encryptionKey) { - printf("Got add data\n"); + Log::debug("Got add data\n"); try { - deserializeAddRequest(value, hash, encryptionKey); - //printf("Got add object, timestamp: %zu\n", addObject.timestamp); + deserializeAddRequest(value, requestDataHash, encryptionKey); + //Log::debug("Got add object, timestamp: %zu\n", addObject.timestamp); } catch (RequestQuarantineException &e) { - fprintf(stderr, "Warning: Request was put in quarantine, will be processed later"); + Log::warn("Request was put in quarantine, will be processed later"); } catch (exception &e) { - fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what()); + Log::warn("Failed to deserialize 'add' request: %s\n", e.what()); } return true; } -- cgit v1.2.3