aboutsummaryrefslogtreecommitdiff
path: root/src/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Database.cpp')
-rw-r--r--src/Database.cpp326
1 files changed, 123 insertions, 203 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 564b3d6..2a7a510 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -5,7 +5,6 @@
#include "../include/odhtdb/bin2hex.hpp"
#include "../include/odhtdb/Log.hpp"
#include <boost/uuid/uuid_generators.hpp>
-#include <opendht.h>
#include <sodium/randombytes.h>
#include <thread>
#include <chrono>
@@ -14,7 +13,6 @@
#include <cassert>
#include <sys/time.h>
-using namespace dht;
using namespace std;
using namespace chrono_literals;
@@ -25,8 +23,6 @@ static odhtdb::u64 timeOffsetFraction = 0;
static thread *ntpThread = nullptr;
static bool timestampSynced = false;
-const int OPENDHT_INFOHASH_LEN = 20;
-
namespace odhtdb
{
static boost::uuids::random_generator uuidGen;
@@ -34,24 +30,6 @@ namespace odhtdb
const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1;
- const int NODE_PUT_RETRY_TIMES = 3;
-
- static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value, int retryCounter = 0)
- {
- node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok)
- {
- if(!ok)
- {
- if(retryCounter < NODE_PUT_RETRY_TIMES)
- {
- Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES);
- nodePutWithRetry(node, infoHash, value, retryCounter + 1);
- }
- else
- Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES);
- }
- });
- }
class RequestQuarantineException : public runtime_error
{
@@ -108,34 +86,13 @@ namespace odhtdb
}
Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) :
+ bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)),
databaseStorage(this, storageDir),
onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc),
onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc),
onAddUserCallbackFunc(callbackFuncs.addUserCallbackFunc),
shuttingDown(false)
{
- node.run(port , {
- /*.dht_config = */{
- /*.node_config = */{
- /*.node_id = */{},
- /*.network = */0,
- /*.is_bootstrap = */false,
- /*.maintain_storage*/false
- },
- /*.id = */databaseStorage.getIdentity()
- },
- /*.threaded = */true,
- /*.proxy_server = */"",
- /*.push_node_id = */""
- });
- node.setStorageLimit(1024 * 1024 * 1); // 1 Megabyte
- auto portStr = to_string(port);
- node.bootstrap(bootstrapNodeAddr, portStr.c_str());
- const auto &remoteNodes = databaseStorage.getRemoteNodes();
- if(!remoteNodes.empty())
- node.bootstrap(remoteNodes);
- Log::debug("Connecting to bootstrap node (%s) and %u other known nodes that we have connected to previously with port %d", bootstrapNodeAddr, remoteNodes.size(), port);
-
// TODO: Make this work for multiple threads initializing database at same time
++databaseCount;
if(databaseCount == 1)
@@ -207,7 +164,8 @@ namespace odhtdb
if(shuttingDown)
return;
}
- databaseStorage.setRemoteNodes(node.exportNodes());
+
+ databaseStorage.setRemotePeers(bootstrapConnection.getPeers());
saveIntervalMs = 30000; // 30 sec
}
});
@@ -219,7 +177,6 @@ namespace odhtdb
--databaseCount;
shuttingDown = true;
remoteNodesSaveThread.join();
- node.join();
}
struct ActionGap
@@ -228,78 +185,80 @@ namespace odhtdb
u64 range;
};
- void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr<InfoHash> requestResponseInfoHash, const shared_ptr<Value> value, usize valueOffset)
+ bool Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size)
{
Log::debug("Request: Got request to send old data");
- try
+
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ u16 requestStructureVersion = deserializer.extract<u16>();
+ if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION)
{
- sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset);
- bool userWantsCreateNode = deserializer.extract<u8>() == 1;
- DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>();
-
- if(userWantsCreateNode)
+ Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
+ return true;
+ }
+ shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>();
+ deserializer.extract(&(*requestResponseInfoHash)[0], sibs::PUBSUB_KEY_LENGTH);
+
+ bool userWantsCreateNode = deserializer.extract<u8>() == 1;
+ DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>();
+
+ if(userWantsCreateNode)
+ {
+ Log::debug("Request: Peer wants CreateNode");
+ databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
- Log::debug("Request: Peer wants CreateNode");
- databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
- {
- Log::debug("Request: Sent create packet to requesting peer");
- shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
- nodePutWithRetry(&node, *requestResponseInfoHash, value);
- });
- }
-
- vector<unique_ptr<u8[]>> userPublicKeys;
+ Log::debug("Request: Sent create packet to requesting peer");
+ bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ });
+ }
+
+ vector<unique_ptr<u8[]>> userPublicKeys;
+
+ // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
+ DataViewMap<vector<ActionGap>> actionGaps;
+ while(!deserializer.empty())
+ {
+ unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]);
+ deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
+ u64 actionGapStart = deserializer.extract<u64>();
+ u64 actionGapRange = deserializer.extract<u64>();
- // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
- DataViewMap<vector<ActionGap>> actionGaps;
- while(!deserializer.empty())
+ DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
+ actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
+ userPublicKeys.emplace_back(move(userPublicKeyRaw));
+ }
+
+ if(actionGaps.empty())
+ Log::debug("No action gaps received, sending all data");
+
+ // TODO(Performance improvement): Instead of sending several packets, combine them into one
+ databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ {
+ bool sendData = false;
+ auto actionGapsIt = actionGaps.find(creatorPublicKey);
+ if(actionGapsIt == actionGaps.end())
{
- unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]);
- deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
- u64 actionGapStart = deserializer.extract<u64>();
- u64 actionGapRange = deserializer.extract<u64>();
-
- DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
- actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
- userPublicKeys.emplace_back(move(userPublicKeyRaw));
+ Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str());
+ sendData = true;
}
-
- if(actionGaps.empty())
- Log::debug("No action gaps received, sending all data");
-
- // TODO(Performance improvement): Instead of sending several packets, combine them into one
- databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ else
{
- bool sendData = false;
- auto actionGapsIt = actionGaps.find(creatorPublicKey);
- if(actionGapsIt == actionGaps.end())
+ for(const auto &userActionGaps : actionGapsIt->second)
{
- Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str());
- sendData = true;
- }
- else
- {
- for(const auto &userActionGaps : actionGapsIt->second)
+ if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
{
- if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
- {
- Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range);
- sendData = true;
- break;
- }
+ Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range);
+ sendData = true;
+ break;
}
}
-
- if(!sendData) return;
- shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
- nodePutWithRetry(&node, *requestResponseInfoHash, value);
- this_thread::sleep_for(chrono::milliseconds(50));
- }, fetchOrder);
- }
- catch (std::exception &e)
- {
- Log::warn("Failed while serving peer, error: %s", e.what());
- }
+ }
+
+ if(!sendData) return;
+ bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ this_thread::sleep_for(chrono::milliseconds(50));
+ }, fetchOrder);
+ return true;
}
void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder)
@@ -322,93 +281,59 @@ namespace odhtdb
Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str());
DhtKey dhtKey(*nodeToSeed.getRequestHash());
- auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> value)
+ newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
+ if(!peer)
+ return true;
+
Log::debug("Seed: New data listener received data...");
- const Hash requestHash(value->data.data(), value->data.size());
+ const Hash requestHash(data, size);
if(requestHash == *nodeToSeed.getRequestHash())
return true;
- //return listenCreateData(value, requestHash, encryptionKey);
else
- return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
+ return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
});
- newSeedInfo.newDataListenerFuture = make_shared<future<size_t>>(move(newDataListenerFuture));
- u8 responseKey[OPENDHT_INFOHASH_LEN];
- randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN);
- shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, OPENDHT_INFOHASH_LEN);;
+ u8 responseKey[sibs::PUBSUB_KEY_LENGTH];
+ randombytes_buf(responseKey, sibs::PUBSUB_KEY_LENGTH);
+ shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, sibs::PUBSUB_KEY_LENGTH);
newSeedInfo.reponseKeyInfoHash = responseKeyShared;
// TODO: If this response key is spammed, generate a new one.
- auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value)
+ newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
- if(value->data.size() == OPENDHT_INFOHASH_LEN)
- {
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
- InfoHash pingResponseKey;
- deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN);
- shared_ptr<Value> responseValue = make_shared<Value>();
- nodePutWithRetry(&node, pingResponseKey, responseValue);
+ if(!peer)
return true;
- }
-
- const Hash requestHash(value->data.data(), value->data.size());
+
+ const Hash requestHash(data, size);
if(requestHash == *nodeToSeed.getRequestHash())
- return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey());
+ return listenCreateData(data, size, requestHash, nodeToSeed.getNodeEncryptionKey());
else
- return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
+ return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
});
- newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture));
// TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
- auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value)
+ newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
+ if(!peer)
+ return true;
+
try
{
- static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?");
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
- u16 requestStructureVersion = deserializer.extract<u16>();
- if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION)
- {
- Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
- return true;
- }
- shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>();
- deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN);
- if(*responseKeyShared == *requestResponseInfoHash)
- {
- Log::debug("Request: Ignorning request for old data from ourself");
- return true;
- }
- else
- Log::debug("Request: Got request from somebody else");
-
- u8 pingResponseKey[OPENDHT_INFOHASH_LEN];
- randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN);
- InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN);
- usize valueOffset = value->data.size() - deserializer.getSize();
- node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr<Value> _)
- {
- sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset);
- return false;
- });
-
- shared_ptr<Value> pingRequest = make_shared<Value>(pingResponseKey, OPENDHT_INFOHASH_LEN);
- nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest);
+ return sendOldDataToPeer(nodeToSeed, data, size);
}
catch (std::exception &e)
{
Log::warn("Failed while serving peer, error: %s", e.what());
+ return true;
}
- return true;
});
- newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture));
seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo;
sibs::SafeSerializer serializer;
serializer.add(DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
- serializer.add(responseKey, OPENDHT_INFOHASH_LEN);
+ serializer.add(responseKey, sibs::PUBSUB_KEY_LENGTH);
bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash());
serializer.add(iHaveCreateNode ? (u8)0 : (u8)1);
serializer.add(fetchOrder);
@@ -440,8 +365,7 @@ namespace odhtdb
}
Log::debug("Sending request for old data");
- shared_ptr<Value> requestValue = make_shared<Value>(move(serializer.getBuffer()));
- nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue);
+ bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
void Database::stopSeeding(const Hash &nodeHash)
@@ -449,11 +373,10 @@ namespace odhtdb
auto seedInfoIt = seedInfoMap.find(nodeHash);
if(seedInfoIt != seedInfoMap.end())
{
- // TODO: Verify if doing get on listener future stalls program forever... Opendht documentation is not clear on this
DhtKey dhtKey(nodeHash);
- node.cancelListen(dhtKey.getNewDataListenerKey(), seedInfoIt->second.newDataListenerFuture->get());
- node.cancelListen(dhtKey.getRequestOldDataKey(), seedInfoIt->second.requestOldDataListenerFuture->get());
- node.cancelListen(*seedInfoIt->second.reponseKeyInfoHash, seedInfoIt->second.responseKeyFuture->get());
+ bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle);
+ bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle);
+ bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle);
seedInfoMap.erase(seedInfoIt);
}
}
@@ -488,9 +411,11 @@ namespace odhtdb
databaseStorage.setNodeDecryptionKey(*hashRequestKey, DataView(encryptionKey->data, encryptionKey->size));
databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size());
+ DatabaseNode databaseNode = { encryptionKey, hashRequestKey };
+ seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST);
+
DhtKey dhtKey(*hashRequestKey);
- shared_ptr<Value> createDataValue = make_shared<Value>(move(serializer.getBuffer()));
- nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue);
+ bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH);
memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH);
@@ -522,8 +447,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
- nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
+ bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo)
@@ -551,8 +475,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
- nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
+ bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
ntp::NtpTimestamp Database::getSyncedTimestampUtc()
@@ -571,9 +494,9 @@ namespace odhtdb
return timestamp;
}
- void Database::deserializeCreateRequest(const shared_ptr<dht::Value> &value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
+ void Database::deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
{
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
u16 packetStructureVersion = deserializer.extract<u16>();
if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION)
{
@@ -605,12 +528,12 @@ namespace odhtdb
uint8_t adminGroupId[GROUP_ID_LENGTH];
deserializer.extract(adminGroupId, GROUP_ID_LENGTH);
- databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, value->data.data(), value->data.size());
+ databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, data, size);
}
- void Database::deserializeAddRequest(const shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
+ void Database::deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
{
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
@@ -647,10 +570,10 @@ namespace odhtdb
u64 newActionCounter = deserializerUnsigned.extract<u64>();
DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize());
- databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView);
+ databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, data, size, additionalDataView);
}
- bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
+ bool Database::listenCreateData(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
{
Log::debug("Got create data in node %s", hash.toString().c_str());
try
@@ -659,7 +582,7 @@ namespace odhtdb
// the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesNodeExist(hash))
throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)");
- deserializeCreateRequest(value, hash, encryptionKey);
+ deserializeCreateRequest(data, size, hash, encryptionKey);
}
catch (exception &e)
{
@@ -668,7 +591,7 @@ namespace odhtdb
return true;
}
- bool Database::listenAddData(shared_ptr<dht::Value> value, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
+ bool Database::listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
{
Log::debug("Got add data in node %s", nodeHash->toString().c_str());
try
@@ -677,7 +600,7 @@ namespace odhtdb
// the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesDataExist(requestDataHash))
throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)");
- deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey);
+ deserializeAddRequest(data, size, requestDataHash, nodeHash, encryptionKey);
}
catch (RequestQuarantineException &e)
{
@@ -720,47 +643,44 @@ namespace odhtdb
return databaseStorage.getUserLowestPermissionLevel(nodeHash, userPublicKey);
}
- std::future<size_t> Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc)
+ sibs::ListenHandle Database::receiveCustomMessage(const InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc)
{
- dht::InfoHash responseKey = receiveMessageKey;
+ InfoHash responseKey = receiveMessageKey;
++responseKey[0];
- return node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr<Value> value)
+ return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
- sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size());
+ sibs::SafeSerializer serializer = callbackFunc(data, size);
if(!serializer.getBuffer().empty())
{
- shared_ptr<Value> responseValue = make_shared<Value>(move(serializer.getBuffer()));
- nodePutWithRetry(&node, responseKey, responseValue);
+ bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
return true;
});
}
- void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data)
+ void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size)
{
- shared_ptr<Value> value = make_shared<Value>(move(data));
- nodePutWithRetry(&node, requestKey, value);
+ bootstrapConnection.put(requestKey.getKey(), data, size);
}
- std::future<size_t> Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc)
+ sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc)
{
- dht::InfoHash responseKey = requestKey;
+ InfoHash responseKey = requestKey;
++responseKey[0];
- auto listener = node.listen(responseKey, [callbackFunc](const shared_ptr<Value> value)
+ auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
- return callbackFunc(true, value->data.data(), value->data.size());
+ return callbackFunc(true, data, size);
});
- shared_ptr<Value> value = make_shared<Value>(move(data));
- nodePutWithRetry(&node, requestKey, value);
+ bootstrapConnection.put(requestKey.getKey(), data, size);
return listener;
}
- void Database::cancelNodeListener(const dht::InfoHash &infoHash, std::future<size_t> &nodeListener)
+ void Database::cancelNodeListener(sibs::ListenHandle &nodeListener)
{
- node.cancelListen(infoHash, nodeListener.get());
+ bootstrapConnection.cancelListen(nodeListener);
}
int Database::clearCache()
@@ -768,8 +688,8 @@ namespace odhtdb
return databaseStorage.clearCache();
}
- dht::InfoHash Database::getInfoHash(const void *data, usize size)
+ InfoHash Database::getInfoHash(const void *data, usize size)
{
- return dht::InfoHash::get((const u8*)data, size);
+ return InfoHash::generateHash((const u8*)data, size);
}
}