From 22abf1aa83b2668f918556491a55947be798f89f Mon Sep 17 00:00:00 2001 From: dec05eba <0xdec05eba@gmail.com> Date: Sun, 21 Oct 2018 14:31:28 +0200 Subject: Async connect --- depends/sibs-pubsub | 2 +- include/odhtdb/Database.hpp | 6 ++-- src/Database.cpp | 59 +++++++++++++++++++++++------------ tests/main.cpp | 75 +++++++++++++++++++++++---------------------- 4 files changed, 83 insertions(+), 59 deletions(-) diff --git a/depends/sibs-pubsub b/depends/sibs-pubsub index f0a2053..4fffc1c 160000 --- a/depends/sibs-pubsub +++ b/depends/sibs-pubsub @@ -1 +1 @@ -Subproject commit f0a2053c2b44298a8cf1aacbafb29870ed521aa0 +Subproject commit 4fffc1cd8ccf1c3b1f15b2cb1942daf8cdd556ab diff --git a/include/odhtdb/Database.hpp b/include/odhtdb/Database.hpp index a326398..61365a0 100644 --- a/include/odhtdb/Database.hpp +++ b/include/odhtdb/Database.hpp @@ -162,7 +162,7 @@ namespace odhtdb DISABLE_COPY(Database) friend class DatabaseStorage; public: - Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs); + static std::future> connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs); ~Database(); // Safe to call multiple times with same node hash, will be ignored if the node is already beeing seeded @@ -212,13 +212,15 @@ namespace odhtdb static InfoHash getInfoHash(const void *data, usize size); private: + Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs); + bool sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size); void deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const std::shared_ptr encryptionKey); void deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr &nodeHash, const std::shared_ptr encryptionKey); bool listenCreateData(const void *data, const usize size, const Hash &hash, const std::shared_ptr encryptionKey); bool listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr nodeHash, const std::shared_ptr encryptionKey); private: - sibs::BootstrapConnection bootstrapConnection; + std::unique_ptr bootstrapConnection; DatabaseStorage databaseStorage; std::function onCreateNodeCallbackFunc; std::function onAddNodeCallbackFunc; diff --git a/src/Database.cpp b/src/Database.cpp index 4422e1c..d3cf606 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -85,8 +85,8 @@ namespace odhtdb return hash; } - Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : - bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)), + Database::Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : + bootstrapConnection(sibs::BootstrapConnection::connect(bootstrapNodeAddr).get()), databaseStorage(this, storageDir), onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc), onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc), @@ -165,7 +165,7 @@ namespace odhtdb return; } - databaseStorage.setRemotePeers(bootstrapConnection.getPeers()); + databaseStorage.setRemotePeers(bootstrapConnection->getPeers()); saveIntervalMs = 30000; // 30 sec } }); @@ -178,6 +178,25 @@ namespace odhtdb shuttingDown = true; remoteNodesSaveThread.join(); } + + std::future> Database::connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) + { + std::promise> connectionPromise; + std::future> connectionFuture = connectionPromise.get_future(); + std::thread([](std::promise> connectionPromise, sibs::Ipv4 bootstrapNodeAddr, const boost::filesystem::path storageDir, DatabaseCallbackFuncs callbackFuncs) + { + try + { + Database *database = new Database(bootstrapNodeAddr, storageDir, callbackFuncs); + connectionPromise.set_value(std::unique_ptr(database)); + } + catch(...) + { + connectionPromise.set_exception(std::current_exception()); + } + }, std::move(connectionPromise), sibs::Ipv4(bootstrapNodeAddr, port), storageDir, callbackFuncs).detach(); + return connectionFuture; + } struct ActionGap { @@ -208,7 +227,7 @@ namespace odhtdb databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { Log::debug("Request: Sent create packet to requesting peer"); - bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); }); } @@ -255,7 +274,7 @@ namespace odhtdb } if(!sendData) return; - bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); this_thread::sleep_for(chrono::milliseconds(50)); }, fetchOrder); return true; @@ -281,7 +300,7 @@ namespace odhtdb Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); - newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + newSeedInfo.newDataListenHandle = bootstrapConnection->listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; @@ -300,7 +319,7 @@ namespace odhtdb newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. - newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + newSeedInfo.responseKeyListenHandle = bootstrapConnection->listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; @@ -314,7 +333,7 @@ namespace odhtdb // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. - newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + newSeedInfo.requestOldDataListenHandle = bootstrapConnection->listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; @@ -365,7 +384,7 @@ namespace odhtdb } Log::debug("Sending request for old data"); - bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); + bootstrapConnection->put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } void Database::stopSeeding(const Hash &nodeHash) @@ -374,9 +393,9 @@ namespace odhtdb if(seedInfoIt != seedInfoMap.end()) { DhtKey dhtKey(nodeHash); - bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle); - bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle); - bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle); + bootstrapConnection->cancelListen(seedInfoIt->second.newDataListenHandle); + bootstrapConnection->cancelListen(seedInfoIt->second.requestOldDataListenHandle); + bootstrapConnection->cancelListen(seedInfoIt->second.responseKeyListenHandle); seedInfoMap.erase(seedInfoIt); } } @@ -415,7 +434,7 @@ namespace odhtdb seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); DhtKey dhtKey(*hashRequestKey); - bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); + bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); shared_ptr adminGroupIdResponse = make_shared(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -447,7 +466,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); + bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -475,7 +494,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); + bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() @@ -648,12 +667,12 @@ namespace odhtdb InfoHash responseKey = receiveMessageKey; ++responseKey[0]; - return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + return bootstrapConnection->listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { sibs::SafeSerializer serializer = callbackFunc(data, size); if(!serializer.getBuffer().empty()) { - bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); + bootstrapConnection->put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } return true; }); @@ -661,7 +680,7 @@ namespace odhtdb void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size) { - bootstrapConnection.put(requestKey.getKey(), data, size); + bootstrapConnection->put(requestKey.getKey(), data, size); } sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc) @@ -669,7 +688,7 @@ namespace odhtdb InfoHash responseKey = requestKey; ++responseKey[0]; - auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + auto listener = bootstrapConnection->listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { return callbackFunc(true, data, size); }); @@ -680,7 +699,7 @@ namespace odhtdb void Database::cancelNodeListener(sibs::ListenHandle &nodeListener) { - bootstrapConnection.cancelListen(nodeListener); + bootstrapConnection->cancelListen(nodeListener); } int Database::clearCache() diff --git a/tests/main.cpp b/tests/main.cpp index 988309c..760e7a9 100644 --- a/tests/main.cpp +++ b/tests/main.cpp @@ -112,11 +112,11 @@ static void testEncryption() assertEquals(0, strncmp(message, (const char*)decryption.getDecryptedText().data, messageLength)); } -static void testTimestamp(const Database &database) +static void testTimestamp(const std::unique_ptr &database) { - auto timestamp1 = database.getSyncedTimestampUtc(); + auto timestamp1 = database->getSyncedTimestampUtc(); this_thread::sleep_for(chrono::milliseconds(100)); - auto timestamp2 = database.getSyncedTimestampUtc(); + auto timestamp2 = database->getSyncedTimestampUtc(); if(timestamp2.getCombined() > timestamp1.getCombined()) Log::debug("Second timestamp is more than first one, as expected"); @@ -163,15 +163,15 @@ static void testStandard() Signature::KeyPair localUserKeyPair; testSignData(localUserKeyPair); - Database database("127.0.0.1", PORT, storagePath, callbackFuncs); + std::unique_ptr database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get(); testTimestamp(database); - auto databaseCreateResponse = database.create(); + auto databaseCreateResponse = database->create(); databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() }; auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair(); - database.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); - database.addUser(databaseNode, *adminUserKey, localUserKeyPair.getPublicKey(), databaseCreateResponse->getNodeAdminGroupId()->getView()); - database.addData(databaseNode, localUserKeyPair, DataView{ (void*)"hello, aaald!", 13 }); + database->addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); + database->addUser(databaseNode, *adminUserKey, localUserKeyPair.getPublicKey(), databaseCreateResponse->getNodeAdminGroupId()->getView()); + database->addData(databaseNode, localUserKeyPair, DataView{ (void*)"hello, aaald!", 13 }); this_thread::sleep_for(chrono::seconds(3)); assertEquals(1, createNodeCounter); @@ -180,13 +180,13 @@ static void testStandard() string username = "dec05eba"; string password = "secretPassword"; - if(database.doesStoredUserExist(username)) + if(database->doesStoredUserExist(username)) fail("Expected stored to not exist until it has been added"); - database.storeNodeInfoForUserEncrypted(databaseNode, username, password, *adminUserKey); + database->storeNodeInfoForUserEncrypted(databaseNode, username, password, *adminUserKey); try { - database.storeNodeInfoForUserEncrypted(databaseNode, username, password, localUserKeyPair); + database->storeNodeInfoForUserEncrypted(databaseNode, username, password, localUserKeyPair); fail("Expected store user password to fail since we have already stored an user in the node"); } catch(SqlExecException &e) @@ -194,10 +194,10 @@ static void testStandard() Log::debug("Failed with sql exception as expected, since we already have an user in the node: %s", e.what()); } - if(!database.doesStoredUserExist(username)) + if(!database->doesStoredUserExist(username)) fail("Expected stored to exist after it has been added"); - auto nodeUserData = database.getStoredNodeUserInfoDecrypted(username, password); + auto nodeUserData = database->getStoredNodeUserInfoDecrypted(username, password); assertEquals((size_t)1, nodeUserData.size()); auto userDataIt = nodeUserData.find(*databaseNode.getRequestHash()); if(userDataIt == nodeUserData.end()) @@ -211,7 +211,7 @@ static void testStandard() try { - database.storeUserWithoutNodes(username, password); + database->storeUserWithoutNodes(username, password); fail("Expected store user to fail since the user already exists in database"); } catch(SqlExecException &e) @@ -219,17 +219,20 @@ static void testStandard() } - database.storeUserWithoutNodes("anotherUser", password); - if(!database.doesStoredUserExist("anotherUser")) + database->storeUserWithoutNodes("anotherUser", password); + if(!database->doesStoredUserExist("anotherUser")) fail("Added user 'anotherUser' to database without any nodes, but it doesn't seem to be stored"); - auto adminUserGroups = database.getUserGroups(*databaseNode.getRequestHash(), adminUserKey->getPublicKey()); + auto adminUserGroups = database->getUserGroups(*databaseNode.getRequestHash(), adminUserKey->getPublicKey()); if(adminUserGroups.size() != 1 || adminUserGroups[0].getView() != databaseCreateResponse->getNodeAdminGroupId()->getView()) fail("Admin group doesn't match group stored in database"); - auto userGroups = database.getUserGroups(*databaseNode.getRequestHash(), localUserKeyPair.getPublicKey()); + auto userGroups = database->getUserGroups(*databaseNode.getRequestHash(), localUserKeyPair.getPublicKey()); if(userGroups.size() != 1 || userGroups[0].getView() != databaseCreateResponse->getNodeAdminGroupId()->getView()) fail("User group doesn't match group stored in database"); + + // Give time for us to disconnect + std::this_thread::sleep_for(std::chrono::seconds(5)); } Log::debug("Callback works when adding data while connected, now testing to reconnect and check if data remains..."); { @@ -237,10 +240,10 @@ static void testStandard() addDataCounter = 0; addUserCounter = 0; - Database database("127.0.0.1", PORT, storagePath, callbackFuncs); - database.loadNode(*databaseNode.getRequestHash()); + std::unique_ptr database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get(); + database->loadNode(*databaseNode.getRequestHash()); - database.seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); + database->seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); this_thread::sleep_for(chrono::seconds(3)); assertEquals(1, createNodeCounter); @@ -252,7 +255,7 @@ static void testStandard() messageToSendSerializer.add((u32)10); u32 receivedNumber = 0; - database.receiveCustomMessage(customMessageKey, [&receivedNumber](const void *data, usize size) + database->receiveCustomMessage(customMessageKey, [&receivedNumber](const void *data, usize size) { sibs::SafeDeserializer deserializer((const u8*)data, size); receivedNumber = deserializer.extract(); @@ -262,7 +265,7 @@ static void testStandard() }); u32 sendCustomMessageResponseNumber = 0; - database.sendCustomMessage(customMessageKey, messageToSendSerializer.getBuffer().data(), messageToSendSerializer.getBuffer().size(), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size) + database->sendCustomMessage(customMessageKey, messageToSendSerializer.getBuffer().data(), messageToSendSerializer.getBuffer().size(), [&sendCustomMessageResponseNumber](bool gotResponse, const void *data, usize size) { if(!gotResponse) { @@ -309,14 +312,14 @@ static void testTwoLocalNodes() DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; - Database database1("127.0.0.1", PORT, storagePath1, callbackFuncs); - auto databaseCreateResponse = database1.create(); + std::unique_ptr database1 = Database::connect("127.0.0.1", PORT, storagePath1, callbackFuncs).get(); + auto databaseCreateResponse = database1->create(); DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() }; auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair(); - database1.addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); + database1->addData(databaseNode, *adminUserKey, DataView{ (void*)"hello, world!", 13 }); - Database database2("127.0.0.1", PORT, storagePath2, callbackFuncs); - database2.seed(databaseNode); + std::unique_ptr database2 = Database::connect("127.0.0.1", PORT, storagePath2, callbackFuncs).get(); + database2->seed(databaseNode); this_thread::sleep_for(chrono::seconds(5)); } @@ -344,8 +347,8 @@ static void testMemoryUsage() DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; - Database database("127.0.0.1", PORT, storagePath, callbackFuncs); - auto databaseCreateResponse = database.create(); + std::unique_ptr database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get(); + auto databaseCreateResponse = database->create(); DatabaseNode databaseNode = { databaseCreateResponse->getNodeEncryptionKey(), databaseCreateResponse->getRequestHash() }; auto adminUserKey = databaseCreateResponse->getNodeAdminKeyPair(); @@ -356,7 +359,7 @@ static void testMemoryUsage() for(int i = 0; i < iterations; ++i) { Log::debug("Memory usage test %d/%d", 1 + i, iterations); - database.addData(databaseNode, *adminUserKey, DataView{ (void*)msg, msgLength }); + database->addData(databaseNode, *adminUserKey, DataView{ (void*)msg, msgLength }); this_thread::sleep_for(chrono::milliseconds(250)); } } @@ -400,22 +403,22 @@ static void testStoreAccount() { DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; - Database database("127.0.0.1", PORT, storagePath, callbackFuncs); + std::unique_ptr database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get(); std::string username = "username"; std::string password = "password"; - database.storeUserWithoutNodes(username, password); - database.getStoredNodeUserInfoDecrypted(username, password); + database->storeUserWithoutNodes(username, password); + database->getStoredNodeUserInfoDecrypted(username, password); } { DatabaseCallbackFuncs callbackFuncs { createNodeCallback, addNodeCallback, addUserCallback }; - Database database("127.0.0.1", PORT, storagePath, callbackFuncs); + std::unique_ptr database = Database::connect("127.0.0.1", PORT, storagePath, callbackFuncs).get(); std::string username = "username"; std::string password = "password"; - database.getStoredNodeUserInfoDecrypted(username, password); + database->getStoredNodeUserInfoDecrypted(username, password); } } -- cgit v1.2.3