From 0d83a5c6070cd02449571879c5be0c4c441e81b7 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Sun, 21 Oct 2018 14:31:28 +0200 Subject: Async connect --- src/Database.cpp | 59 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/Database.cpp b/src/Database.cpp index 4422e1c..d3cf606 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -85,8 +85,8 @@ namespace odhtdb return hash; } - Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : - bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)), + Database::Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : + bootstrapConnection(sibs::BootstrapConnection::connect(bootstrapNodeAddr).get()), databaseStorage(this, storageDir), onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc), onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc), @@ -165,7 +165,7 @@ namespace odhtdb return; } - databaseStorage.setRemotePeers(bootstrapConnection.getPeers()); + databaseStorage.setRemotePeers(bootstrapConnection->getPeers()); saveIntervalMs = 30000; // 30 sec } }); @@ -178,6 +178,25 @@ namespace odhtdb shuttingDown = true; remoteNodesSaveThread.join(); } + + std::future> Database::connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) + { + std::promise> connectionPromise; + std::future> connectionFuture = connectionPromise.get_future(); + std::thread([](std::promise> connectionPromise, sibs::Ipv4 bootstrapNodeAddr, const boost::filesystem::path storageDir, DatabaseCallbackFuncs callbackFuncs) + { + try + { + Database *database = new Database(bootstrapNodeAddr, storageDir, callbackFuncs); + connectionPromise.set_value(std::unique_ptr(database)); + } + catch(...) + { + connectionPromise.set_exception(std::current_exception()); + } + }, std::move(connectionPromise), sibs::Ipv4(bootstrapNodeAddr, port), storageDir, callbackFuncs).detach(); + return connectionFuture; + } struct ActionGap { @@ -208,7 +227,7 @@ namespace odhtdb databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { Log::debug("Request: Sent create packet to requesting peer"); - bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); }); } @@ -255,7 +274,7 @@ namespace odhtdb } if(!sendData) return; - bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); this_thread::sleep_for(chrono::milliseconds(50)); }, fetchOrder); return true; @@ -281,7 +300,7 @@ namespace odhtdb Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); - newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + newSeedInfo.newDataListenHandle = bootstrapConnection->listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; @@ -300,7 +319,7 @@ namespace odhtdb newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. - newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + newSeedInfo.responseKeyListenHandle = bootstrapConnection->listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; @@ -314,7 +333,7 @@ namespace odhtdb // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. - newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + newSeedInfo.requestOldDataListenHandle = bootstrapConnection->listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { if(!peer) return true; @@ -365,7 +384,7 @@ namespace odhtdb } Log::debug("Sending request for old data"); - bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); + bootstrapConnection->put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } void Database::stopSeeding(const Hash &nodeHash) @@ -374,9 +393,9 @@ namespace odhtdb if(seedInfoIt != seedInfoMap.end()) { DhtKey dhtKey(nodeHash); - bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle); - bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle); - bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle); + bootstrapConnection->cancelListen(seedInfoIt->second.newDataListenHandle); + bootstrapConnection->cancelListen(seedInfoIt->second.requestOldDataListenHandle); + bootstrapConnection->cancelListen(seedInfoIt->second.responseKeyListenHandle); seedInfoMap.erase(seedInfoIt); } } @@ -415,7 +434,7 @@ namespace odhtdb seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); DhtKey dhtKey(*hashRequestKey); - bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); + bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); shared_ptr adminGroupIdResponse = make_shared(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -447,7 +466,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); + bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -475,7 +494,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); + bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() @@ -648,12 +667,12 @@ namespace odhtdb InfoHash responseKey = receiveMessageKey; ++responseKey[0]; - return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + return bootstrapConnection->listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { sibs::SafeSerializer serializer = callbackFunc(data, size); if(!serializer.getBuffer().empty()) { - bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); + bootstrapConnection->put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } return true; }); @@ -661,7 +680,7 @@ namespace odhtdb void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size) { - bootstrapConnection.put(requestKey.getKey(), data, size); + bootstrapConnection->put(requestKey.getKey(), data, size); } sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc) @@ -669,7 +688,7 @@ namespace odhtdb InfoHash responseKey = requestKey; ++responseKey[0]; - auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) + auto listener = bootstrapConnection->listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { return callbackFunc(true, data, size); }); @@ -680,7 +699,7 @@ namespace odhtdb void Database::cancelNodeListener(sibs::ListenHandle &nodeListener) { - bootstrapConnection.cancelListen(nodeListener); + bootstrapConnection->cancelListen(nodeListener); } int Database::clearCache() -- cgit v1.2.3