aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-10-21 14:31:28 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-10-21 14:31:28 +0200
commit22abf1aa83b2668f918556491a55947be798f89f (patch)
tree7d4658eaf7cf679a4955e6b86fa2bb389002e096 /src
parentacb17dec61717e2577cec9272ce22e2c8f7a2851 (diff)
Async connect
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp59
1 files changed, 39 insertions, 20 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 4422e1c..d3cf606 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -85,8 +85,8 @@ namespace odhtdb
return hash;
}
- Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) :
- bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)),
+ Database::Database(const sibs::Ipv4 &bootstrapNodeAddr, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) :
+ bootstrapConnection(sibs::BootstrapConnection::connect(bootstrapNodeAddr).get()),
databaseStorage(this, storageDir),
onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc),
onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc),
@@ -165,7 +165,7 @@ namespace odhtdb
return;
}
- databaseStorage.setRemotePeers(bootstrapConnection.getPeers());
+ databaseStorage.setRemotePeers(bootstrapConnection->getPeers());
saveIntervalMs = 30000; // 30 sec
}
});
@@ -178,6 +178,25 @@ namespace odhtdb
shuttingDown = true;
remoteNodesSaveThread.join();
}
+
+ std::future<std::unique_ptr<Database>> Database::connect(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs)
+ {
+ std::promise<std::unique_ptr<Database>> connectionPromise;
+ std::future<std::unique_ptr<Database>> connectionFuture = connectionPromise.get_future();
+ std::thread([](std::promise<std::unique_ptr<Database>> connectionPromise, sibs::Ipv4 bootstrapNodeAddr, const boost::filesystem::path storageDir, DatabaseCallbackFuncs callbackFuncs)
+ {
+ try
+ {
+ Database *database = new Database(bootstrapNodeAddr, storageDir, callbackFuncs);
+ connectionPromise.set_value(std::unique_ptr<Database>(database));
+ }
+ catch(...)
+ {
+ connectionPromise.set_exception(std::current_exception());
+ }
+ }, std::move(connectionPromise), sibs::Ipv4(bootstrapNodeAddr, port), storageDir, callbackFuncs).detach();
+ return connectionFuture;
+ }
struct ActionGap
{
@@ -208,7 +227,7 @@ namespace odhtdb
databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
Log::debug("Request: Sent create packet to requesting peer");
- bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
});
}
@@ -255,7 +274,7 @@ namespace odhtdb
}
if(!sendData) return;
- bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ bootstrapConnection->put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
this_thread::sleep_for(chrono::milliseconds(50));
}, fetchOrder);
return true;
@@ -281,7 +300,7 @@ namespace odhtdb
Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str());
DhtKey dhtKey(*nodeToSeed.getRequestHash());
- newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ newSeedInfo.newDataListenHandle = bootstrapConnection->listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
if(!peer)
return true;
@@ -300,7 +319,7 @@ namespace odhtdb
newSeedInfo.reponseKeyInfoHash = responseKeyShared;
// TODO: If this response key is spammed, generate a new one.
- newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ newSeedInfo.responseKeyListenHandle = bootstrapConnection->listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
if(!peer)
return true;
@@ -314,7 +333,7 @@ namespace odhtdb
// TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
- newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ newSeedInfo.requestOldDataListenHandle = bootstrapConnection->listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
if(!peer)
return true;
@@ -365,7 +384,7 @@ namespace odhtdb
}
Log::debug("Sending request for old data");
- bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
+ bootstrapConnection->put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
void Database::stopSeeding(const Hash &nodeHash)
@@ -374,9 +393,9 @@ namespace odhtdb
if(seedInfoIt != seedInfoMap.end())
{
DhtKey dhtKey(nodeHash);
- bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle);
- bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle);
- bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle);
+ bootstrapConnection->cancelListen(seedInfoIt->second.newDataListenHandle);
+ bootstrapConnection->cancelListen(seedInfoIt->second.requestOldDataListenHandle);
+ bootstrapConnection->cancelListen(seedInfoIt->second.responseKeyListenHandle);
seedInfoMap.erase(seedInfoIt);
}
}
@@ -415,7 +434,7 @@ namespace odhtdb
seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST);
DhtKey dhtKey(*hashRequestKey);
- bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
+ bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH);
memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH);
@@ -447,7 +466,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
+ bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo)
@@ -475,7 +494,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
+ bootstrapConnection->put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
ntp::NtpTimestamp Database::getSyncedTimestampUtc()
@@ -648,12 +667,12 @@ namespace odhtdb
InfoHash responseKey = receiveMessageKey;
++responseKey[0];
- return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ return bootstrapConnection->listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
sibs::SafeSerializer serializer = callbackFunc(data, size);
if(!serializer.getBuffer().empty())
{
- bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
+ bootstrapConnection->put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
return true;
});
@@ -661,7 +680,7 @@ namespace odhtdb
void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size)
{
- bootstrapConnection.put(requestKey.getKey(), data, size);
+ bootstrapConnection->put(requestKey.getKey(), data, size);
}
sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc)
@@ -669,7 +688,7 @@ namespace odhtdb
InfoHash responseKey = requestKey;
++responseKey[0];
- auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
+ auto listener = bootstrapConnection->listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
return callbackFunc(true, data, size);
});
@@ -680,7 +699,7 @@ namespace odhtdb
void Database::cancelNodeListener(sibs::ListenHandle &nodeListener)
{
- bootstrapConnection.cancelListen(nodeListener);
+ bootstrapConnection->cancelListen(nodeListener);
}
int Database::clearCache()