aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
m---------depends/sibs-pubsub0
-rw-r--r--include/odhtdb/Database.hpp6
-rw-r--r--src/Database.cpp59
-rw-r--r--tests/main.cpp75
4 files changed, 82 insertions, 58 deletions
diff --git a/depends/sibs-pubsub b/depends/sibs-pubsub
-Subproject f0a2053c2b44298a8cf1aacbafb29870ed521aa
+Subproject 4fffc1cd8ccf1c3b1f15b2cb1942daf8cdd556a
diff --git a/include/odhtdb/Database.hpp b/include/odhtdb/Database.hpp
index a326398..61365a0 100644
--- a/include/odhtdb/Database.hpp
+++ b/include/odhtdb/Database.hpp
@@ -162,7 +162,7 @@ namespace odhtdb
DISABLE_COPY(Database)
friend class DatabaseStorage;
public:
- Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs);
+ static std::future<std::unique_ptr<Database>> connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs);
~Database();
// Safe to call multiple times with same node hash, will be ignored if the node is already beeing seeded
@@ -212,13 +212,15 @@ namespace odhtdb
static InfoHash getInfoHash(const void *data, usize size);
private:
+ Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs);
+
bool sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size);
void deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const std::shared_ptr<OwnedByteArray> encryptionKey);
void deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const std::shared_ptr<OwnedByteArray> encryptionKey);
bool listenCreateData(const void *data, const usize size, const Hash &hash, const std::shared_ptr<OwnedByteArray> encryptionKey);
bool listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const std::shared_ptr<OwnedByteArray> encryptionKey);
private:
- sibs::BootstrapConnection bootstrapConnection;
+ std::unique_ptr<sibs::BootstrapConnection> bootstrapConnection;
DatabaseStorage databaseStorage;
std::function<void(const DatabaseCreateNodeRequest&)> onCreateNodeCallbackFunc;
std::function<void(const DatabaseAddNodeRequest&)> onAddNodeCallbackFunc;
diff --git a/src/Database.cpp b/src/Database.cpp
index 4422e1c..d3cf606 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -85,8 +85,8 @@ namespace odhtdb
return hash;
}
- Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) :
- bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)),
+ Database::Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) :
+ bootstrapConnection(sibs::BootstrapConnection::connect(bootstrapNodeAddr).get()),
databaseStorage(this, storageDir),
onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc),
onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc),
@@ -165,7 +165,7 @@ namespace odhtdb
return;
}
- databaseStorage.setRemotePeers(bootstrapConnection.getPeers());
+ databaseStorage.setRemotePeers(bootstrapConnection->getPeers());
saveIntervalMs = 30000; // 30 sec
}
});
@@ -178,6 +178,25 @@ namespace odhtdb
shuttingDown = true;
remoteNodesSaveThread.join();
}
+
+ std::future<std::unique_ptr<Database>> Database::connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs)
+ {
+ std::promise<std::unique_ptr<Database>> connectionPromise;
+ std::future<std::unique_ptr<Database>> connectionFuture = connectionPromise.get_future();
+ std::thread([](std::promise<std::unique_ptr<Database>> connectionPromise, sibs::Ipv4 bootstrapNodeAddr, const boost::filesystem::path storageDir, DatabaseCallbackFuncs callbackFuncs)
+ {
+ try
+ {
+ Database *database = new Database(bootstrapNodeAddr, storageDir, callbackFuncs);
+ connectionPromise.set_value(std::unique_ptr<Database>(database));
+ }
+ catch(...)
+ {
+ connectionPromise.set_exception(std::current_exception());
+ }
+ }, std::move(connectionPromise), sibs::Ipv4(bootstrapNodeAddr, port), storageDir, callbackFuncs).detach();
+ return connectionFuture;
+ }
struct ActionGap
{
@@ -208,7 +227,7 @@ namespace odhtdb
databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
Log::debug("Request: Sent create packet to requesting peer");
- bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
});
}
@@ -255,7 +274,7 @@ namespace odhtdb
}
if(!sendData) return;
- bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
this_thread::sleep_for(chrono::milliseconds(50));
}, fetchOrder);
return true;
@@ -281,7 +300,7 @@ namespace odhtdb
Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str());
DhtKey dhtKey(*nodeToSeed.getRequestHash());
- newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ newSeedInfo.newDataListenHandle = bootstrapConnection->listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
if(!peer)
return true;
@@ -300,7 +319,7 @@ namespace odhtdb
newSeedInfo.reponseKeyInfoHash = responseKeyShared;
// TODO: If this response key is spammed, generate a new one.
- newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ newSeedInfo.responseKeyListenHandle = bootstrapConnection->listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
if(!peer)
return true;
@@ -314,7 +333,7 @@ namespace odhtdb
// TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
- newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ newSeedInfo.requestOldDataListenHandle = bootstrapConnection->listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
if(!peer)
return true;
@@ -365,7 +384,7 @@ namespace odhtdb
}
Log::debug("Sending request for old data");
- bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
+ bootstrapConnection->put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
void Database::stopSeeding(const Hash &nodeHash)
@@ -374,9 +393,9 @@ namespace odhtdb
if(seedInfoIt != seedInfoMap.end())
{
DhtKey dhtKey(nodeHash);
- bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle);
- bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle);
- bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle);
+ bootstrapConnection->cancelListen(seedInfoIt->second.newDataListenHandle);
+ bootstrapConnection->cancelListen(seedInfoIt->second.requestOldDataListenHandle);
+ bootstrapConnection->cancelListen(seedInfoIt->second.responseKeyListenHandle);
seedInfoMap.erase(seedInfoIt);
}
}
@@ -415,7 +434,7 @@ namespace odhtdb
seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST);
DhtKey dhtKey(*hashRequestKey);
- bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
+ bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH);
memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH);
@@ -447,7 +466,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
+ bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo)
@@ -475,7 +494,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
+ bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
ntp::NtpTimestamp Database::getSyncedTimestampUtc()
@@ -648,12 +667,12 @@ namespace odhtdb
InfoHash responseKey = receiveMessageKey;
++responseKey[0];
- return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ return bootstrapConnection->listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
sibs::SafeSerializer serializer = callbackFunc(data, size);
if(!serializer.getBuffer().empty())
{
- bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
+ bootstrapConnection->put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
return true;
});
@@ -661,7 +680,7 @@ namespace odhtdb
void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size)
{
- bootstrapConnection.put(requestKey.getKey(), data, size);
+ bootstrapConnection->put(requestKey.getKey(), data, size);
}
sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc)
@@ -669,7 +688,7 @@ namespace odhtdb
InfoHash responseKey = requestKey;
++responseKey[0];
- auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ auto listener = bootstrapConnection->listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
return callbackFunc(true, data, size);
});
@@ -680,7 +699,7 @@ namespace odhtdb
void Database::cancelNodeListener(sibs::ListenHandle &nodeListener)
{
- bootstrapConnection.cancelListen(nodeListener);
+ bootstrapConnection->cancelListen(nodeListener);
}
int Database::clearCache()
diff --git a/tests/main.cpp b/tests/main.cpp
index 988309c..760e7a9 100644
--- a/tests/main.cpp
+++ b/tests/main.cpp
@@ -112,11 +112,11 @@ static void testEncryption()
assertEquals(0, strncmp(message, (const char*)decryption.getDecryptedText().data, messageLength));
}
-static void testTimestamp(const Database &database)
+static void testTimestamp(const std::unique_ptr<Database> &database)
{
- auto timestamp1 = database.getSyncedTimestampUtc();
+ auto timestamp1 = database->getSyncedTimestampUtc();
this_thread::sleep_for(chrono::milliseconds(100));
- auto timestamp2 = database.getSyncedTimestampUtc();
+ auto timestamp2 = database->getSyncedTimestampUtc();
if(timestamp2.getCombined() > timestamp1.getCombined())
Log::debug("Second timestamp is more than first one, as expected");
@@ -163,15 +163,15 @@ static void testStandard()
Signature::KeyPair localUserKeyPair;
testSignData(localUserKeyPair);
- Database database("127.0.0.1", PORT, storagePath, callbackFuncs);
+ std::unique_ptr<Database> database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get();
testTimestamp(database);
- auto databaseCreateResponse = database.create();
+ auto databaseCreateResponse = database->create();
databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() };
auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair();
- database.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 });
- database.addUser(databaseNode, *adminUserKey, localUserKeyPair.getPublicKey(), databaseCreateResponse->getNodeAdminGroupId()->getView());
- database.addData(databaseNode, localUserKeyPair, DataView{ (void*)"hello, aaald!", 13 });
+ database->addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 });
+ database->addUser(databaseNode, *adminUserKey, localUserKeyPair.getPublicKey(), databaseCreateResponse->getNodeAdminGroupId()->getView());
+ database->addData(databaseNode, localUserKeyPair, DataView{ (void*)"hello, aaald!", 13 });
this_thread::sleep_for(chrono::seconds(3));
assertEquals(1, createNodeCounter);
@@ -180,13 +180,13 @@ static void testStandard()
string username = "dec05eba";
string password = "secretPassword";
- if(database.doesStoredUserExist(username))
+ if(database->doesStoredUserExist(username))
fail("Expected stored to not exist until it has been added");
- database.storeNodeInfoForUserEncrypted(databaseNode, username, password, *adminUserKey);
+ database->storeNodeInfoForUserEncrypted(databaseNode, username, password, *adminUserKey);
try
{
- database.storeNodeInfoForUserEncrypted(databaseNode, username, password, localUserKeyPair);
+ database->storeNodeInfoForUserEncrypted(databaseNode, username, password, localUserKeyPair);
fail("Expected store user password to fail since we have already stored an user in the node");
}
catch(SqlExecException &e)
@@ -194,10 +194,10 @@ static void testStandard()
Log::debug("Failed with sql exception as expected, since we already have an user in the node: %s", e.what());
}
- if(!database.doesStoredUserExist(username))
+ if(!database->doesStoredUserExist(username))
fail("Expected stored to exist after it has been added");
- auto nodeUserData = database.getStoredNodeUserInfoDecrypted(username, password);
+ auto nodeUserData = database->getStoredNodeUserInfoDecrypted(username, password);
assertEquals((size_t)1, nodeUserData.size());
auto userDataIt = nodeUserData.find(*databaseNode.getRequestHash());
if(userDataIt == nodeUserData.end())
@@ -211,7 +211,7 @@ static void testStandard()
try
{
- database.storeUserWithoutNodes(username, password);
+ database->storeUserWithoutNodes(username, password);
fail("Expected store user to fail since the user already exists in database");
}
catch(SqlExecException &e)
@@ -219,17 +219,20 @@ static void testStandard()
}
- database.storeUserWithoutNodes("anotherUser", password);
- if(!database.doesStoredUserExist("anotherUser"))
+ database->storeUserWithoutNodes("anotherUser", password);
+ if(!database->doesStoredUserExist("anotherUser"))
fail("Added user 'anotherUser' to database without any nodes, but it doesn't seem to be stored");
- auto adminUserGroups = database.getUserGroups(*databaseNode.getRequestHash(), adminUserKey->getPublicKey());
+ auto adminUserGroups = database->getUserGroups(*databaseNode.getRequestHash(), adminUserKey->getPublicKey());
if(adminUserGroups.size() != 1 || adminUserGroups[0].getView() != databaseCreateResponse->getNodeAdminGroupId()->getView())
fail("Admin group doesn't match group stored in database");
- auto userGroups = database.getUserGroups(*databaseNode.getRequestHash(), localUserKeyPair.getPublicKey());
+ auto userGroups = database->getUserGroups(*databaseNode.getRequestHash(), localUserKeyPair.getPublicKey());
if(userGroups.size() != 1 || userGroups[0].getView() != databaseCreateResponse->getNodeAdminGroupId()->getView())
fail("User group doesn't match group stored in database");
+
+ // Give time for us to disconnect
+ std::this_thread::sleep_for(std::chrono::seconds(5));
}
Log::debug("Callback works when adding data while connected, now testing to reconnect and check if data remains...");
{
@@ -237,10 +240,10 @@ static void testStandard()
addDataCounter = 0;
addUserCounter = 0;
- Database database("127.0.0.1", PORT, storagePath, callbackFuncs);
- database.loadNode(*databaseNode.getRequestHash());
+ std::unique_ptr<Database> database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get();
+ database->loadNode(*databaseNode.getRequestHash());
- database.seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST);
+ database->seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST);
this_thread::sleep_for(chrono::seconds(3));
assertEquals(1, createNodeCounter);
@@ -252,7 +255,7 @@ static void testStandard()
messageToSendSerializer.add((u32)10);
u32 receivedNumber = 0;
- database.receiveCustomMessage(customMessageKey, [&receivedNumber](const void *data, usize size)
+ database->receiveCustomMessage(customMessageKey, [&receivedNumber](const void *data, usize size)
{
sibs::SafeDeserializer deserializer((const u8*)data, size);
receivedNumber = deserializer.extract<u32>();
@@ -262,7 +265,7 @@ static void testStandard()
});
u32 sendCustomMessageResponseNumber = 0;
- database.sendCustomMessage(customMessageKey, messageToSendSerializer.getBuffer().data(), messageToSendSerializer.getBuffer().size(), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size)
+ database->sendCustomMessage(customMessageKey, messageToSendSerializer.getBuffer().data(), messageToSendSerializer.getBuffer().size(), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size)
{
if(!gotResponse)
{
@@ -309,14 +312,14 @@ static void testTwoLocalNodes()
DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback };
- Database database1("127.0.0.1", PORT, storagePath1, callbackFuncs);
- auto databaseCreateResponse = database1.create();
+ std::unique_ptr<Database> database1 = Database::connect("127.0.0.1", PORT, storagePath1, callbackFuncs).get();
+ auto databaseCreateResponse = database1->create();
DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() };
auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair();
- database1.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 });
+ database1->addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 });
- Database database2("127.0.0.1", PORT, storagePath2, callbackFuncs);
- database2.seed(databaseNode);
+ std::unique_ptr<Database> database2 = Database::connect("127.0.0.1", PORT, storagePath2, callbackFuncs).get();
+ database2->seed(databaseNode);
this_thread::sleep_for(chrono::seconds(5));
}
@@ -344,8 +347,8 @@ static void testMemoryUsage()
DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback };
- Database database("127.0.0.1", PORT, storagePath, callbackFuncs);
- auto databaseCreateResponse = database.create();
+ std::unique_ptr<Database> database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get();
+ auto databaseCreateResponse = database->create();
DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() };
auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair();
@@ -356,7 +359,7 @@ static void testMemoryUsage()
for(int i = 0; i < iterations; ++i)
{
Log::debug("Memory usage test %d/%d", 1 + i, iterations);
- database.addData(databaseNode, *adminUserKey, DataView{ (void*)msg, msgLength });
+ database->addData(databaseNode, *adminUserKey, DataView{ (void*)msg, msgLength });
this_thread::sleep_for(chrono::milliseconds(250));
}
}
@@ -400,22 +403,22 @@ static void testStoreAccount()
{
DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback };
- Database database("127.0.0.1", PORT, storagePath, callbackFuncs);
+ std::unique_ptr<Database> database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get();
std::string username = "username";
std::string password = "password";
- database.storeUserWithoutNodes(username, password);
- database.getStoredNodeUserInfoDecrypted(username, password);
+ database->storeUserWithoutNodes(username, password);
+ database->getStoredNodeUserInfoDecrypted(username, password);
}
{
DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback };
- Database database("127.0.0.1", PORT, storagePath, callbackFuncs);
+ std::unique_ptr<Database> database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get();
std::string username = "username";
std::string password = "password";
- database.getStoredNodeUserInfoDecrypted(username, password);
+ database->getStoredNodeUserInfoDecrypted(username, password);
}
}