aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordec05eba <0xdec05eba@gmail.com>2018-10-16 00:38:01 +0200
committerdec05eba <0xdec05eba@gmail.com>2018-10-16 00:39:21 +0200
commit13f2007d104149f69ab7a794d2e119830e638eaa (patch)
tree8dfffb5669d9db6f2328426f5a1cccb72275d92e /src
parent911e62afb82b140e368181f4966442cd5c2e1bd8 (diff)
Replace opendht with sibs pubsub
This should fix issues with memory usage/leaks and make it easier to get peers subscribed to the same key. It will also be easier to modify and also works easier cross platform because of no additional dependencies.
Diffstat (limited to 'src')
-rw-r--r--src/Database.cpp326
-rw-r--r--src/DatabaseStorage.cpp69
-rw-r--r--src/DhtKey.cpp8
-rw-r--r--src/InfoHash.cpp37
4 files changed, 172 insertions, 268 deletions
diff --git a/src/Database.cpp b/src/Database.cpp
index 564b3d6..2a7a510 100644
--- a/src/Database.cpp
+++ b/src/Database.cpp
@@ -5,7 +5,6 @@
#include "../include/odhtdb/bin2hex.hpp"
#include "../include/odhtdb/Log.hpp"
#include <boost/uuid/uuid_generators.hpp>
-#include <opendht.h>
#include <sodium/randombytes.h>
#include <thread>
#include <chrono>
@@ -14,7 +13,6 @@
#include <cassert>
#include <sys/time.h>
-using namespace dht;
using namespace std;
using namespace chrono_literals;
@@ -25,8 +23,6 @@ static odhtdb::u64 timeOffsetFraction = 0;
static thread *ntpThread = nullptr;
static bool timestampSynced = false;
-const int OPENDHT_INFOHASH_LEN = 20;
-
namespace odhtdb
{
static boost::uuids::random_generator uuidGen;
@@ -34,24 +30,6 @@ namespace odhtdb
const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1;
const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1;
- const int NODE_PUT_RETRY_TIMES = 3;
-
- static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value, int retryCounter = 0)
- {
- node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok)
- {
- if(!ok)
- {
- if(retryCounter < NODE_PUT_RETRY_TIMES)
- {
- Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES);
- nodePutWithRetry(node, infoHash, value, retryCounter + 1);
- }
- else
- Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES);
- }
- });
- }
class RequestQuarantineException : public runtime_error
{
@@ -108,34 +86,13 @@ namespace odhtdb
}
Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) :
+ bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)),
databaseStorage(this, storageDir),
onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc),
onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc),
onAddUserCallbackFunc(callbackFuncs.addUserCallbackFunc),
shuttingDown(false)
{
- node.run(port , {
- /*.dht_config = */{
- /*.node_config = */{
- /*.node_id = */{},
- /*.network = */0,
- /*.is_bootstrap = */false,
- /*.maintain_storage*/false
- },
- /*.id = */databaseStorage.getIdentity()
- },
- /*.threaded = */true,
- /*.proxy_server = */"",
- /*.push_node_id = */""
- });
- node.setStorageLimit(1024 * 1024 * 1); // 1 Megabyte
- auto portStr = to_string(port);
- node.bootstrap(bootstrapNodeAddr, portStr.c_str());
- const auto &remoteNodes = databaseStorage.getRemoteNodes();
- if(!remoteNodes.empty())
- node.bootstrap(remoteNodes);
- Log::debug("Connecting to bootstrap node (%s) and %u other known nodes that we have connected to previously with port %d", bootstrapNodeAddr, remoteNodes.size(), port);
-
// TODO: Make this work for multiple threads initializing database at same time
++databaseCount;
if(databaseCount == 1)
@@ -207,7 +164,8 @@ namespace odhtdb
if(shuttingDown)
return;
}
- databaseStorage.setRemoteNodes(node.exportNodes());
+
+ databaseStorage.setRemotePeers(bootstrapConnection.getPeers());
saveIntervalMs = 30000; // 30 sec
}
});
@@ -219,7 +177,6 @@ namespace odhtdb
--databaseCount;
shuttingDown = true;
remoteNodesSaveThread.join();
- node.join();
}
struct ActionGap
@@ -228,78 +185,80 @@ namespace odhtdb
u64 range;
};
- void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr<InfoHash> requestResponseInfoHash, const shared_ptr<Value> value, usize valueOffset)
+ bool Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size)
{
Log::debug("Request: Got request to send old data");
- try
+
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
+ u16 requestStructureVersion = deserializer.extract<u16>();
+ if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION)
{
- sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset);
- bool userWantsCreateNode = deserializer.extract<u8>() == 1;
- DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>();
-
- if(userWantsCreateNode)
+ Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
+ return true;
+ }
+ shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>();
+ deserializer.extract(&(*requestResponseInfoHash)[0], sibs::PUBSUB_KEY_LENGTH);
+
+ bool userWantsCreateNode = deserializer.extract<u8>() == 1;
+ DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>();
+
+ if(userWantsCreateNode)
+ {
+ Log::debug("Request: Peer wants CreateNode");
+ databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
{
- Log::debug("Request: Peer wants CreateNode");
- databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData)
- {
- Log::debug("Request: Sent create packet to requesting peer");
- shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
- nodePutWithRetry(&node, *requestResponseInfoHash, value);
- });
- }
-
- vector<unique_ptr<u8[]>> userPublicKeys;
+ Log::debug("Request: Sent create packet to requesting peer");
+ bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ });
+ }
+
+ vector<unique_ptr<u8[]>> userPublicKeys;
+
+ // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
+ DataViewMap<vector<ActionGap>> actionGaps;
+ while(!deserializer.empty())
+ {
+ unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]);
+ deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
+ u64 actionGapStart = deserializer.extract<u64>();
+ u64 actionGapRange = deserializer.extract<u64>();
- // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants
- DataViewMap<vector<ActionGap>> actionGaps;
- while(!deserializer.empty())
+ DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
+ actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
+ userPublicKeys.emplace_back(move(userPublicKeyRaw));
+ }
+
+ if(actionGaps.empty())
+ Log::debug("No action gaps received, sending all data");
+
+ // TODO(Performance improvement): Instead of sending several packets, combine them into one
+ databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ {
+ bool sendData = false;
+ auto actionGapsIt = actionGaps.find(creatorPublicKey);
+ if(actionGapsIt == actionGaps.end())
{
- unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]);
- deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
- u64 actionGapStart = deserializer.extract<u64>();
- u64 actionGapRange = deserializer.extract<u64>();
-
- DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES);
- actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange });
- userPublicKeys.emplace_back(move(userPublicKeyRaw));
+ Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str());
+ sendData = true;
}
-
- if(actionGaps.empty())
- Log::debug("No action gaps received, sending all data");
-
- // TODO(Performance improvement): Instead of sending several packets, combine them into one
- databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter)
+ else
{
- bool sendData = false;
- auto actionGapsIt = actionGaps.find(creatorPublicKey);
- if(actionGapsIt == actionGaps.end())
+ for(const auto &userActionGaps : actionGapsIt->second)
{
- Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str());
- sendData = true;
- }
- else
- {
- for(const auto &userActionGaps : actionGapsIt->second)
+ if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
{
- if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range)
- {
- Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range);
- sendData = true;
- break;
- }
+ Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range);
+ sendData = true;
+ break;
}
}
-
- if(!sendData) return;
- shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size);
- nodePutWithRetry(&node, *requestResponseInfoHash, value);
- this_thread::sleep_for(chrono::milliseconds(50));
- }, fetchOrder);
- }
- catch (std::exception &e)
- {
- Log::warn("Failed while serving peer, error: %s", e.what());
- }
+ }
+
+ if(!sendData) return;
+ bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size);
+ this_thread::sleep_for(chrono::milliseconds(50));
+ }, fetchOrder);
+ return true;
}
void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder)
@@ -322,93 +281,59 @@ namespace odhtdb
Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str());
DhtKey dhtKey(*nodeToSeed.getRequestHash());
- auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> value)
+ newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
+ if(!peer)
+ return true;
+
Log::debug("Seed: New data listener received data...");
- const Hash requestHash(value->data.data(), value->data.size());
+ const Hash requestHash(data, size);
if(requestHash == *nodeToSeed.getRequestHash())
return true;
- //return listenCreateData(value, requestHash, encryptionKey);
else
- return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
+ return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
});
- newSeedInfo.newDataListenerFuture = make_shared<future<size_t>>(move(newDataListenerFuture));
- u8 responseKey[OPENDHT_INFOHASH_LEN];
- randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN);
- shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, OPENDHT_INFOHASH_LEN);;
+ u8 responseKey[sibs::PUBSUB_KEY_LENGTH];
+ randombytes_buf(responseKey, sibs::PUBSUB_KEY_LENGTH);
+ shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, sibs::PUBSUB_KEY_LENGTH);
newSeedInfo.reponseKeyInfoHash = responseKeyShared;
// TODO: If this response key is spammed, generate a new one.
- auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value)
+ newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
- if(value->data.size() == OPENDHT_INFOHASH_LEN)
- {
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
- InfoHash pingResponseKey;
- deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN);
- shared_ptr<Value> responseValue = make_shared<Value>();
- nodePutWithRetry(&node, pingResponseKey, responseValue);
+ if(!peer)
return true;
- }
-
- const Hash requestHash(value->data.data(), value->data.size());
+
+ const Hash requestHash(data, size);
if(requestHash == *nodeToSeed.getRequestHash())
- return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey());
+ return listenCreateData(data, size, requestHash, nodeToSeed.getNodeEncryptionKey());
else
- return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
+ return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey());
});
- newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture));
// TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data.
// This is to prevent too many peers from responding to a request to get old data.
- auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value)
+ newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
+ if(!peer)
+ return true;
+
try
{
- static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?");
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
- u16 requestStructureVersion = deserializer.extract<u16>();
- if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION)
- {
- Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
- return true;
- }
- shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>();
- deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN);
- if(*responseKeyShared == *requestResponseInfoHash)
- {
- Log::debug("Request: Ignorning request for old data from ourself");
- return true;
- }
- else
- Log::debug("Request: Got request from somebody else");
-
- u8 pingResponseKey[OPENDHT_INFOHASH_LEN];
- randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN);
- InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN);
- usize valueOffset = value->data.size() - deserializer.getSize();
- node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr<Value> _)
- {
- sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset);
- return false;
- });
-
- shared_ptr<Value> pingRequest = make_shared<Value>(pingResponseKey, OPENDHT_INFOHASH_LEN);
- nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest);
+ return sendOldDataToPeer(nodeToSeed, data, size);
}
catch (std::exception &e)
{
Log::warn("Failed while serving peer, error: %s", e.what());
+ return true;
}
- return true;
});
- newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture));
seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo;
sibs::SafeSerializer serializer;
serializer.add(DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION);
- serializer.add(responseKey, OPENDHT_INFOHASH_LEN);
+ serializer.add(responseKey, sibs::PUBSUB_KEY_LENGTH);
bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash());
serializer.add(iHaveCreateNode ? (u8)0 : (u8)1);
serializer.add(fetchOrder);
@@ -440,8 +365,7 @@ namespace odhtdb
}
Log::debug("Sending request for old data");
- shared_ptr<Value> requestValue = make_shared<Value>(move(serializer.getBuffer()));
- nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue);
+ bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
void Database::stopSeeding(const Hash &nodeHash)
@@ -449,11 +373,10 @@ namespace odhtdb
auto seedInfoIt = seedInfoMap.find(nodeHash);
if(seedInfoIt != seedInfoMap.end())
{
- // TODO: Verify if doing get on listener future stalls program forever... Opendht documentation is not clear on this
DhtKey dhtKey(nodeHash);
- node.cancelListen(dhtKey.getNewDataListenerKey(), seedInfoIt->second.newDataListenerFuture->get());
- node.cancelListen(dhtKey.getRequestOldDataKey(), seedInfoIt->second.requestOldDataListenerFuture->get());
- node.cancelListen(*seedInfoIt->second.reponseKeyInfoHash, seedInfoIt->second.responseKeyFuture->get());
+ bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle);
+ bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle);
+ bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle);
seedInfoMap.erase(seedInfoIt);
}
}
@@ -488,9 +411,11 @@ namespace odhtdb
databaseStorage.setNodeDecryptionKey(*hashRequestKey, DataView(encryptionKey->data, encryptionKey->size));
databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size());
+ DatabaseNode databaseNode = { encryptionKey, hashRequestKey };
+ seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST);
+
DhtKey dhtKey(*hashRequestKey);
- shared_ptr<Value> createDataValue = make_shared<Value>(move(serializer.getBuffer()));
- nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue);
+ bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH);
memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH);
@@ -522,8 +447,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
- nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
+ bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo)
@@ -551,8 +475,7 @@ namespace odhtdb
databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView);
DhtKey dhtKey(*nodeInfo.getRequestHash());
- shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size);
- nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue);
+ bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size);
}
ntp::NtpTimestamp Database::getSyncedTimestampUtc()
@@ -571,9 +494,9 @@ namespace odhtdb
return timestamp;
}
- void Database::deserializeCreateRequest(const shared_ptr<dht::Value> &value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
+ void Database::deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
{
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
u16 packetStructureVersion = deserializer.extract<u16>();
if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION)
{
@@ -605,12 +528,12 @@ namespace odhtdb
uint8_t adminGroupId[GROUP_ID_LENGTH];
deserializer.extract(adminGroupId, GROUP_ID_LENGTH);
- databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, value->data.data(), value->data.size());
+ databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, data, size);
}
- void Database::deserializeAddRequest(const shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
+ void Database::deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
{
- sibs::SafeDeserializer deserializer(value->data.data(), value->data.size());
+ sibs::SafeDeserializer deserializer((const u8*)data, size);
char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES];
deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES);
@@ -647,10 +570,10 @@ namespace odhtdb
u64 newActionCounter = deserializerUnsigned.extract<u64>();
DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize());
- databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView);
+ databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, data, size, additionalDataView);
}
- bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
+ bool Database::listenCreateData(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey)
{
Log::debug("Got create data in node %s", hash.toString().c_str());
try
@@ -659,7 +582,7 @@ namespace odhtdb
// the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesNodeExist(hash))
throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)");
- deserializeCreateRequest(value, hash, encryptionKey);
+ deserializeCreateRequest(data, size, hash, encryptionKey);
}
catch (exception &e)
{
@@ -668,7 +591,7 @@ namespace odhtdb
return true;
}
- bool Database::listenAddData(shared_ptr<dht::Value> value, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
+ bool Database::listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey)
{
Log::debug("Got add data in node %s", nodeHash->toString().c_str());
try
@@ -677,7 +600,7 @@ namespace odhtdb
// the database has constraint to deal with this in multi-threaded way
if(databaseStorage.doesDataExist(requestDataHash))
throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)");
- deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey);
+ deserializeAddRequest(data, size, requestDataHash, nodeHash, encryptionKey);
}
catch (RequestQuarantineException &e)
{
@@ -720,47 +643,44 @@ namespace odhtdb
return databaseStorage.getUserLowestPermissionLevel(nodeHash, userPublicKey);
}
- std::future<size_t> Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc)
+ sibs::ListenHandle Database::receiveCustomMessage(const InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc)
{
- dht::InfoHash responseKey = receiveMessageKey;
+ InfoHash responseKey = receiveMessageKey;
++responseKey[0];
- return node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr<Value> value)
+ return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
- sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size());
+ sibs::SafeSerializer serializer = callbackFunc(data, size);
if(!serializer.getBuffer().empty())
{
- shared_ptr<Value> responseValue = make_shared<Value>(move(serializer.getBuffer()));
- nodePutWithRetry(&node, responseKey, responseValue);
+ bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size());
}
return true;
});
}
- void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data)
+ void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size)
{
- shared_ptr<Value> value = make_shared<Value>(move(data));
- nodePutWithRetry(&node, requestKey, value);
+ bootstrapConnection.put(requestKey.getKey(), data, size);
}
- std::future<size_t> Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc)
+ sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc)
{
- dht::InfoHash responseKey = requestKey;
+ InfoHash responseKey = requestKey;
++responseKey[0];
- auto listener = node.listen(responseKey, [callbackFunc](const shared_ptr<Value> value)
+ auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size)
{
- return callbackFunc(true, value->data.data(), value->data.size());
+ return callbackFunc(true, data, size);
});
- shared_ptr<Value> value = make_shared<Value>(move(data));
- nodePutWithRetry(&node, requestKey, value);
+ bootstrapConnection.put(requestKey.getKey(), data, size);
return listener;
}
- void Database::cancelNodeListener(const dht::InfoHash &infoHash, std::future<size_t> &nodeListener)
+ void Database::cancelNodeListener(sibs::ListenHandle &nodeListener)
{
- node.cancelListen(infoHash, nodeListener.get());
+ bootstrapConnection.cancelListen(nodeListener);
}
int Database::clearCache()
@@ -768,8 +688,8 @@ namespace odhtdb
return databaseStorage.clearCache();
}
- dht::InfoHash Database::getInfoHash(const void *data, usize size)
+ InfoHash Database::getInfoHash(const void *data, usize size)
{
- return dht::InfoHash::get((const u8*)data, size);
+ return InfoHash::generateHash((const u8*)data, size);
}
}
diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp
index 9398254..14b74e8 100644
--- a/src/DatabaseStorage.cpp
+++ b/src/DatabaseStorage.cpp
@@ -158,18 +158,6 @@ namespace odhtdb
metadataSerializer.add(STORAGE_VERSION);
randombytes_buf(passwordSalt, PASSWORD_SALT_LEN);
metadataSerializer.add(passwordSalt, PASSWORD_SALT_LEN);
-
- identity = dht::crypto::generateIdentity();
- dht::Blob privateKeyData = identity.first->serialize();
- metadataSerializer.add((u16)privateKeyData.size());
- metadataSerializer.add(privateKeyData.data(), privateKeyData.size());
-
- dht::Blob certificateData;
- identity.second->pack(certificateData);
- metadataSerializer.add((u16)certificateData.size());
- metadataSerializer.add(certificateData.data(), certificateData.size());
-
- fileAppend(metadataFilePath, { metadataSerializer.getBuffer().data(), metadataSerializer.getBuffer().size() });
}
catch(sibs::DeserializeException &e)
{
@@ -335,37 +323,13 @@ namespace odhtdb
throw std::runtime_error("Wrong storage version!");
deserializer.extract(passwordSalt, PASSWORD_SALT_LEN);
-
- u16 privateKeySize = deserializer.extract<u16>();
- dht::Blob privateKeyRaw;
- privateKeyRaw.resize(privateKeySize);
- deserializer.extract(&privateKeyRaw[0], privateKeySize);
- identity.first = make_shared<dht::crypto::PrivateKey>(privateKeyRaw);
-
- u16 certificateSize = deserializer.extract<u16>();
- dht::Blob certificateRaw;
- certificateRaw.resize(certificateSize);
- deserializer.extract(&certificateRaw[0], certificateSize);
- identity.second = make_shared<dht::crypto::Certificate>(certificateRaw);
-
assert(deserializer.empty());
}
void DatabaseStorage::loadRemoteNodesFromFile()
{
OwnedByteArray remoteNodesFileContent = fileGetContent(remoteNodesFilePath);
- msgpack::unpacker pac;
- pac.reserve_buffer(remoteNodesFileContent.size);
- memcpy(pac.buffer(), remoteNodesFileContent.data, remoteNodesFileContent.size);
- pac.buffer_consumed(remoteNodesFileContent.size);
-
- msgpack::object_handle oh;
- while(pac.next(oh))
- {
- auto importedNodes = oh.get().as<vector<dht::NodeExport>>();
- remoteNodes.reserve(remoteNodes.size() + importedNodes.size());
- remoteNodes.insert(remoteNodes.end(), importedNodes.begin(), importedNodes.end());
- }
+ remotePeers = sibs::DirectConnectionsUtils::deserializePeers(remoteNodesFileContent.data, remoteNodesFileContent.size);
}
static void sqlite_step_throw_on_failure(sqlite3 *db, sqlite3_stmt *stmt, const char *description)
@@ -1025,29 +989,17 @@ namespace odhtdb
decryptNodeData(nodeHash, nodeDecryptionKeyResult.second);
}
- struct RemoteNodePacker
+ const vector<std::shared_ptr<sibs::DirectConnectionPeer>>& DatabaseStorage::getRemotePeers() const
{
- sibs::SafeSerializer serializer;
-
- RemoteNodePacker& write(const char *data, size_t size)
- {
- serializer.add((const u8*)data, size);
- return *this;
- }
- };
-
- const vector<dht::NodeExport>& DatabaseStorage::getRemoteNodes() const
- {
- return remoteNodes;
+ return remotePeers;
}
- void DatabaseStorage::setRemoteNodes(const std::vector<dht::NodeExport> &remoteNodes)
+ void DatabaseStorage::setRemotePeers(const std::vector<std::shared_ptr<sibs::DirectConnectionPeer>> &remotePeers)
{
- Log::debug("Storing %u remote nodes", remoteNodes.size());
- this->remoteNodes = remoteNodes;
- RemoteNodePacker remoteNodePacker;
- msgpack::pack(remoteNodePacker, remoteNodes);
- fileOverwrite(remoteNodesFilePath, DataView(remoteNodePacker.serializer.getBuffer().data(), remoteNodePacker.serializer.getBuffer().size()));
+ Log::debug("Storing %u remote peers", remotePeers.size());
+ this->remotePeers = remotePeers;
+ std::vector<u8> serializedPeers = sibs::DirectConnectionsUtils::serializePeers(remotePeers);
+ fileOverwrite(remoteNodesFilePath, DataView(serializedPeers.data(), serializedPeers.size()));
}
vector<OwnedByteArray> DatabaseStorage::getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const
@@ -1292,11 +1244,6 @@ namespace odhtdb
return true;
}
- const dht::crypto::Identity& DatabaseStorage::getIdentity() const
- {
- return identity;
- }
-
void DatabaseStorage::update()
{
auto time = chrono::high_resolution_clock::now().time_since_epoch();
diff --git a/src/DhtKey.cpp b/src/DhtKey.cpp
index 1508657..b84dbb7 100644
--- a/src/DhtKey.cpp
+++ b/src/DhtKey.cpp
@@ -8,24 +8,24 @@ namespace odhtdb
firstByteOriginalValue = infoHash[0];
}
- DhtKey::DhtKey(const dht::InfoHash &_infoHash) : infoHash(_infoHash)
+ DhtKey::DhtKey(const InfoHash &_infoHash) : infoHash(_infoHash)
{
firstByteOriginalValue = infoHash[0];
}
- dht::InfoHash DhtKey::getNewDataListenerKey()
+ InfoHash DhtKey::getNewDataListenerKey()
{
infoHash[0] = firstByteOriginalValue;
return infoHash;
}
- dht::InfoHash DhtKey::getRequestOldDataKey()
+ InfoHash DhtKey::getRequestOldDataKey()
{
infoHash[0] = firstByteOriginalValue + 1;
return infoHash;
}
- dht::InfoHash DhtKey::getPingKey()
+ InfoHash DhtKey::getPingKey()
{
infoHash[0] = firstByteOriginalValue + 10;
return infoHash;
diff --git a/src/InfoHash.cpp b/src/InfoHash.cpp
new file mode 100644
index 0000000..90e6e31
--- /dev/null
+++ b/src/InfoHash.cpp
@@ -0,0 +1,37 @@
+#include "../include/odhtdb/InfoHash.hpp"
+#include <sodium/crypto_generichash_blake2b.h>
+#include <sodium/core.h>
+#include <algorithm>
+
+namespace odhtdb
+{
+ InfoHash::InfoHash()
+ {
+
+ }
+
+ InfoHash::InfoHash(const u8 *data, const size_t size) :
+ key(data, size)
+ {
+
+ }
+
+ InfoHash InfoHash::generateHash(const u8 *data, const size_t size)
+ {
+ InfoHash infoHash;
+ int result = crypto_generichash_blake2b((unsigned char*)&infoHash.key.data[0], infoHash.key.data.size(), (const unsigned char*)data, size, nullptr, 0);
+ if(result < 0)
+ throw InfoHashException("Failed to hash data using blake2b");
+ return infoHash;
+ }
+
+ bool InfoHash::operator == (const InfoHash &other) const
+ {
+ return key == other.key;
+ }
+
+ bool InfoHash::operator != (const InfoHash &other) const
+ {
+ return !(*this == other);
+ }
+} \ No newline at end of file