aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAleksi Lindeman <0xdec05eba@gmail.com>2018-03-13 03:03:11 +0100
committerAleksi Lindeman <0xdec05eba@gmail.com>2018-03-13 03:06:45 +0100
commitf84aba861d26601ae4c7203daa39752e7c95cfd8 (patch)
tree2b39728d07fb370e87b05ba2fb63958e5ad8fb28 /src
parent1328d943c5016dd1662a4e46d4a408bca010cffc (diff)
Fix add data operation not working correctly
Reminder: do not get reference to hash map value... duh Add thread-safe logging (log is in order now!). Store data immediately to database when WE add it instead of waiting for response from remote peers. TODO: Test with multiple peers (not only localhost)
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp75
-rw-r--r--src/DatabaseStorage.cpp31
-rw-r--r--src/Log.cpp40
-rw-r--r--src/User.cpp7
4 files changed, 107 insertions, 46 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index fc1a69b..6f53f1d 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -5,6 +5,7 @@
#include "../include/Encryption.hpp"
#include "../include/DhtKey.hpp"
#include "../include/bin2hex.hpp"
+#include "../include/Log.hpp"
#include <boost/uuid/uuid_generators.hpp>
#include <opendht.h>
#include <fmt/format.h>
@@ -150,12 +151,12 @@ namespace odhtdb
// (only if there are plenty of other seeders for the cached files. This could also cause race issue
// where all nodes with a cached file delete it at same time)
- printf("Seeding key: %s\n", hash->toString().c_str());
+ Log::debug("Seeding key: %s\n", hash->toString().c_str());
DhtKey dhtKey(*hash);
node.listen(dhtKey.getNewDataListenerKey(), [this, hash, encryptionKey](const shared_ptr<Value> &value)
{
- printf("Seed: New data listener received data...\n");
+ Log::debug("Seed: New data listener received data...\n");
const Hash requestHash(value->data.data(), value->data.size());
if(requestHash == *hash)
return true;
@@ -181,7 +182,7 @@ namespace odhtdb
// This is to prevent too many peers from responding to a request to get old data.
node.listen(dhtKey.getRequestOldDataKey(), [this, hash](const shared_ptr<Value> &value)
{
- printf("Request: Got request to send old data\n");
+ Log::debug("Request: Got request to send old data\n");
try
{
sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
@@ -192,32 +193,34 @@ namespace odhtdb
auto requestedData = databaseStorage.getStorage(*hash);
if(!requestedData)
{
- fprintf(stderr, "Warning: No data found for hash %s, unable to serve peer\n", hash->toString().c_str());
+ Log::warn("No data found for hash %s, unable to serve peer\n", hash->toString().c_str());
return true;
}
+ InfoHash requestResponseInfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN);
+
if(dataStartTimestamp == 0)
{
- printf("Request: Sent create packet to requesting peer\n");
- node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok)
+ Log::debug("Request: Sent create packet to requesting peer\n");
+ node.put(requestResponseInfoHash, Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok)
{
if(!ok)
- fprintf(stderr, "Failed to put response for old data for 'create' data\n");
+ Log::warn("Failed to put response for old data for 'create' data\n");
});
}
for(auto requestedObject : requestedData->objects)
{
- node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok)
+ node.put(requestResponseInfoHash, Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok)
{
if(!ok)
- fprintf(stderr, "Failed to put response for old data for 'add' data\n");
+ Log::warn("Failed to put response for old data for 'add' data\n");
});
}
}
catch (sibs::DeserializeException &e)
{
- fprintf(stderr, "Warning: Failed to deserialize 'get old data' request: %s\n", e.what());
+ Log::warn("Failed to deserialize 'get old data' request: %s\n", e.what());
}
return true;
});
@@ -228,7 +231,7 @@ namespace odhtdb
node.put(dhtKey.getRequestOldDataKey(), Value(serializer.getBuffer().data(), serializer.getBuffer().size()), [](bool ok)
{
if(!ok)
- fprintf(stderr, "Failed to put request to get old data\n");
+ Log::warn("Failed to put request to get old data\n");
});
//node.listen(CREATE_DATA_HASH, bind(&Database::listenCreateData, this, _1));
@@ -295,7 +298,8 @@ namespace odhtdb
string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData);
free(requestData.data);
DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData);
- // TODO: Add add object to database storage here for local user
+ Hash requestDataHash(stagedAddObject.data, stagedAddObject.size);
+ databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size);
stagedAddObjects.emplace_back(make_unique<StagedObject>(stagedAddObject, nodeInfo.getRequestHash()));
}
@@ -344,24 +348,26 @@ namespace odhtdb
DataView requestData { serializer.getBuffer().data(), serializer.getBuffer().size() };
string signedRequestData = userToPerformActionWith->getPrivateKey().sign(requestData);
DataView stagedAddObject = combine(userToPerformActionWith->getPublicKey(), signedRequestData);
- // TODO: Add add object to database storage here for local user
+ Hash requestDataHash(stagedAddObject.data, stagedAddObject.size);
+ databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, userToPerformActionWith, timestampMicroseconds, (u8*)stagedAddObject.data, stagedAddObject.size);
+ auto userToAdd = RemoteUser::create(userToAddPublicKey, userToAddName, groupToAddUserTo);
+ databaseStorage.addUser(userToAdd, *nodeInfo.getRequestHash());
stagedAddObjects.emplace_back(make_unique<StagedObject>(stagedAddObject, nodeInfo.getRequestHash()));
}
void Database::commit()
{
// TODO: Combine staged objects into one object for efficiency.
- // TODO: Add rollback
try
{
- printf("Num objects to create: %zu\n", stagedCreateObjects.size());
+ Log::debug("Num objects to create: %zu\n", stagedCreateObjects.size());
for(const auto &stagedObject : stagedCreateObjects)
{
commitStagedCreateObject(stagedObject);
}
- printf("Num objects to add: %zu\n", stagedAddObjects.size());
+ Log::debug("Num objects to add: %zu\n", stagedAddObjects.size());
for(const auto &stagedObject : stagedAddObjects)
{
commitStagedAddObject(stagedObject);
@@ -369,7 +375,8 @@ namespace odhtdb
}
catch (exception &e)
{
- fprintf(stderr, "Error: Failed to commit, reason: %s\n", e.what());
+ // TODO: Add rollback
+ Log::error("Failed to commit, reason: %s\n", e.what());
}
for(const auto &stagedObject : stagedCreateObjects)
@@ -395,7 +402,7 @@ namespace odhtdb
{
// TODO: Handle failure to put data
if(!ok)
- fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject");
+ Log::warn("Failed to put: %s, what to do?\n", "commitStagedCreateObject");
}/* TODO: How to make this work?, time_point(), false*/);
}
@@ -407,7 +414,7 @@ namespace odhtdb
{
// TODO: Handle failure to put data
if(!ok)
- fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject");
+ Log::warn("Failed to put: %s, what to do?\n", "commitStagedAddObject");
}/* TODO: How to make this work?, time_point(), false*/);
}
@@ -483,7 +490,7 @@ namespace odhtdb
return false;
}
- void Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &hash, const shared_ptr<char*> encryptionKey)
+ void Database::deserializeAddRequest(const std::shared_ptr<dht::Value> &value, const Hash &requestDataHash, const shared_ptr<char*> encryptionKey)
{
sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
@@ -518,14 +525,14 @@ namespace odhtdb
// The user (public key) could belong to a node but we might not have retrieved the node info yet since data may
// not be retrieved in order.
// Data in quarantine is processed when 'create' packet is received or removed after 60 seconds
- databaseStorage.addToQuarantine(creatorPublicKey, creationDate, value->data.data(), value->data.size());
+ databaseStorage.addToQuarantine(requestDataHash, creatorPublicKey, creationDate, value->data.data(), value->data.size());
throw RequestQuarantineException();
}
auto creatorUser = databaseStorage.getUserByPublicKey(creatorPublicKey);
// TODO: Verify there isn't already data with same timestamp for this node. Same for quarantine.
// TODO: We might receive 'add' data packet before 'create'. If that happens, we should put it in quarantine and process it later.
- databaseStorage.appendStorage(*node, creatorUser, creationDate, value->data.data(), value->data.size());
+ databaseStorage.appendStorage(*node, requestDataHash, creatorUser, creationDate, value->data.data(), value->data.size());
if(operation == DatabaseOperation::ADD_DATA)
{
@@ -551,7 +558,7 @@ namespace odhtdb
throw PermissionDeniedException(errMsg);
}
- printf("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data);
+ Log::debug("Got add object, timestamp: %zu, data: %.*s\n", creationDate, decryptedBody.getDecryptedText().size, decryptedBody.getDecryptedText().data);
}
else if(operation == DatabaseOperation::ADD_USER)
{
@@ -572,7 +579,7 @@ namespace odhtdb
if(group)
{
auto user = RemoteUser::create(userToAddPublicKey, name, group);
- databaseStorage.addUser(user, hash);
+ databaseStorage.addUser(user, *node);
auto creatorUserGroupWithRights = getGroupWithRightsToAddUserToGroup(creatorUser->getGroups(), group);
if(!creatorUserGroupWithRights)
@@ -589,7 +596,7 @@ namespace odhtdb
throw PermissionDeniedException(errMsg);
}
- printf("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str());
+ Log::debug("Got add user object, timestamp: %zu, user added: %.*s\n", creationDate, nameLength, name.c_str());
}
else
{
@@ -606,36 +613,36 @@ namespace odhtdb
bool Database::listenCreateData(std::shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<char*> encryptionKey)
{
- printf("Got create data\n");
+ Log::debug("Got create data\n");
try
{
if(databaseStorage.getStorage(hash))
throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)");
DatabaseCreateRequest createObject = deserializeCreateRequest(value, hash, encryptionKey);
- printf("Got create object, name: %s\n", createObject.name.c_str());
+ Log::debug("Got create object, name: %s\n", createObject.name.c_str());
}
catch (exception &e)
{
- fprintf(stderr, "Warning: Failed to deserialize 'create' request: %s\n", e.what());
+ Log::warn("Failed to deserialize 'create' request: %s\n", e.what());
}
return true;
}
- bool Database::listenAddData(std::shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<char*> encryptionKey)
+ bool Database::listenAddData(std::shared_ptr<dht::Value> value, const Hash &requestDataHash, const shared_ptr<char*> encryptionKey)
{
- printf("Got add data\n");
+ Log::debug("Got add data\n");
try
{
- deserializeAddRequest(value, hash, encryptionKey);
- //printf("Got add object, timestamp: %zu\n", addObject.timestamp);
+ deserializeAddRequest(value, requestDataHash, encryptionKey);
+ //Log::debug("Got add object, timestamp: %zu\n", addObject.timestamp);
}
catch (RequestQuarantineException &e)
{
- fprintf(stderr, "Warning: Request was put in quarantine, will be processed later");
+ Log::warn("Request was put in quarantine, will be processed later");
}
catch (exception &e)
{
- fprintf(stderr, "Warning: Failed to deserialize 'add' request: %s\n", e.what());
+ Log::warn("Failed to deserialize 'add' request: %s\n", e.what());
}
return true;
}
diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp
index 7c86d18..0f75d1f 100644
--- a/src/DatabaseStorage.cpp
+++ b/src/DatabaseStorage.cpp
@@ -47,33 +47,52 @@ namespace odhtdb
}
}
- void DatabaseStorage::appendStorage(const Hash &hash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize)
+ void DatabaseStorage::appendStorage(const Hash &nodeHash, const Hash &dataHash, const User *creatorUser, u64 timestamp, const u8 *data, usize dataSize)
{
- auto it = storageMap.find(hash);
+ auto it = storageMap.find(nodeHash);
if(it == storageMap.end())
{
string errMsg = "Database storage with hash ";
- errMsg += hash.toString();
+ errMsg += nodeHash.toString();
errMsg += " not found. Storage for a hash needs to be created before data can be appended to it";
throw DatabaseStorageNotFound(errMsg);
}
+ auto storedDataIt = storedDataHash.find(dataHash);
+ if(storedDataIt != storedDataHash.end())
+ {
+ string errMsg = "Database already contains data with hash: ";
+ errMsg += dataHash.toString();
+ throw DatabaseStorageAlreadyExists(errMsg);
+ }
+
DataView storageData { new u8[dataSize], dataSize };
+ memcpy(storageData.data, data, dataSize);
DatabaseStorageObject *databaseStorageObject = new DatabaseStorageObject(storageData, timestamp, creatorUser->getPublicKey());
it->second->objects.push_back(databaseStorageObject);
+ storedDataHash.insert(dataHash);
}
- void DatabaseStorage::addToQuarantine(const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize)
+ void DatabaseStorage::addToQuarantine(const Hash &dataHash, const Signature::PublicKey &creatorPublicKey, u64 timestamp, const u8 *data, usize dataSize)
{
+ auto storedDataIt = storedDataHash.find(dataHash);
+ if(storedDataIt != storedDataHash.end())
+ {
+ string errMsg = "Database already contains data with hash: ";
+ errMsg += dataHash.toString();
+ throw DatabaseStorageAlreadyExists(errMsg);
+ }
+
DataView storageData { new u8[dataSize], dataSize };
memcpy(storageData.data, data, dataSize);
DatabaseStorageQuarantineObject *databaseQuarantineStorageObject = new DatabaseStorageQuarantineObject(storageData, timestamp, creatorPublicKey);
quarantineStorageMap[creatorPublicKey].emplace_back(databaseQuarantineStorageObject);
+ storedDataHash.insert(dataHash);
}
void DatabaseStorage::addUser(User *user, const Hash &hash)
{
- userPublicKeyNodeMap[user->getPublicKey()] = hash;
+ userPublicKeyNodeMap[user->getPublicKey()] = new Hash(hash);
publicKeyUserMap[user->getPublicKey()] = user;
}
@@ -89,7 +108,7 @@ namespace odhtdb
{
auto it = userPublicKeyNodeMap.find(userPublicKey);
if(it != userPublicKeyNodeMap.end())
- return &it->second;
+ return it->second;
return nullptr;
}
diff --git a/src/Log.cpp b/src/Log.cpp
new file mode 100644
index 0000000..5d77d73
--- /dev/null
+++ b/src/Log.cpp
@@ -0,0 +1,40 @@
+#include "../include/Log.hpp"
+#include <cstdarg>
+#include <mutex>
+
+static std::mutex mutexDebug;
+
+namespace odhtdb
+{
+ // TODO: Add color (if output is tty)?
+
+ void Log::debug(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexDebug);
+ va_list args;
+ va_start(args, fmt);
+ fputs("Debug: ", stdout);
+ vfprintf(stdout, fmt, args);
+ va_end(args);
+ }
+
+ void Log::warn(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexDebug);
+ va_list args;
+ va_start(args, fmt);
+ fputs("Warning: ", stdout);
+ vfprintf(stdout, fmt, args);
+ va_end(args);
+ }
+
+ void Log::error(const char *fmt, ...)
+ {
+ std::lock_guard<std::mutex> lock(mutexDebug);
+ va_list args;
+ va_start(args, fmt);
+ fputs("Error: ", stderr);
+ vfprintf(stderr, fmt, args);
+ va_end(args);
+ }
+}
diff --git a/src/User.cpp b/src/User.cpp
index e2017ff..4ec93cd 100644
--- a/src/User.cpp
+++ b/src/User.cpp
@@ -7,12 +7,7 @@ namespace odhtdb
{
if(name.size() > 255)
throw UserNameTooLongException(name);
-
- if(group)
- {
- groups.emplace_back(group);
- group->addUser(this);
- }
+ addToGroup(group);
}
void User::addToGroup(Group *group)