From 0e627b69d4d0a8d01a21e4dc9bd7be370c0a1245 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Tue, 16 Oct 2018 00:38:01 +0200 Subject: Replace opendht with sibs pubsub This should fix issues with memory usage/leaks and make it easier to get peers subscribed to the same key. It will also be easier to modify and also works easier cross platform because of no additional dependencies. --- src/Database.cpp | 326 ++++++++++++++++++------------------------------ src/DatabaseStorage.cpp | 69 ++-------- src/DhtKey.cpp | 8 +- src/InfoHash.cpp | 37 ++++++ 4 files changed, 172 insertions(+), 268 deletions(-) create mode 100644 src/InfoHash.cpp (limited to 'src') diff --git a/src/Database.cpp b/src/Database.cpp index 564b3d6..2a7a510 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -5,7 +5,6 @@ #include "../include/odhtdb/bin2hex.hpp" #include "../include/odhtdb/Log.hpp" #include -#include #include #include #include @@ -14,7 +13,6 @@ #include #include -using namespace dht; using namespace std; using namespace chrono_literals; @@ -25,8 +23,6 @@ static odhtdb::u64 timeOffsetFraction = 0; static thread *ntpThread = nullptr; static bool timestampSynced = false; -const int OPENDHT_INFOHASH_LEN = 20; - namespace odhtdb { static boost::uuids::random_generator uuidGen; @@ -34,24 +30,6 @@ namespace odhtdb const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; - const int NODE_PUT_RETRY_TIMES = 3; - - static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr value, int retryCounter = 0) - { - node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok) - { - if(!ok) - { - if(retryCounter < NODE_PUT_RETRY_TIMES) - { - Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES); - nodePutWithRetry(node, infoHash, value, retryCounter + 1); - } - else - Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES); - } - }); - } class RequestQuarantineException : public runtime_error { @@ -108,34 +86,13 @@ namespace odhtdb } Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : + bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)), databaseStorage(this, storageDir), onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc), onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc), onAddUserCallbackFunc(callbackFuncs.addUserCallbackFunc), shuttingDown(false) { - node.run(port , { - /*.dht_config = */{ - /*.node_config = */{ - /*.node_id = */{}, - /*.network = */0, - /*.is_bootstrap = */false, - /*.maintain_storage*/false - }, - /*.id = */databaseStorage.getIdentity() - }, - /*.threaded = */true, - /*.proxy_server = */"", - /*.push_node_id = */"" - }); - node.setStorageLimit(1024 * 1024 * 1); // 1 Megabyte - auto portStr = to_string(port); - node.bootstrap(bootstrapNodeAddr, portStr.c_str()); - const auto &remoteNodes = databaseStorage.getRemoteNodes(); - if(!remoteNodes.empty()) - node.bootstrap(remoteNodes); - Log::debug("Connecting to bootstrap node (%s) and %u other known nodes that we have connected to previously with port %d", bootstrapNodeAddr, remoteNodes.size(), port); - // TODO: Make this work for multiple threads initializing database at same time ++databaseCount; if(databaseCount == 1) @@ -207,7 +164,8 @@ namespace odhtdb if(shuttingDown) return; } - databaseStorage.setRemoteNodes(node.exportNodes()); + + databaseStorage.setRemotePeers(bootstrapConnection.getPeers()); saveIntervalMs = 30000; // 30 sec } }); @@ -219,7 +177,6 @@ namespace odhtdb --databaseCount; shuttingDown = true; remoteNodesSaveThread.join(); - node.join(); } struct ActionGap @@ -228,78 +185,80 @@ namespace odhtdb u64 range; }; - void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr requestResponseInfoHash, const shared_ptr value, usize valueOffset) + bool Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size) { Log::debug("Request: Got request to send old data"); - try + + sibs::SafeDeserializer deserializer((const u8*)data, size); + u16 requestStructureVersion = deserializer.extract(); + if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) { - sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset); - bool userWantsCreateNode = deserializer.extract() == 1; - DatabaseFetchOrder fetchOrder = deserializer.extract(); - - if(userWantsCreateNode) + Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); + return true; + } + shared_ptr requestResponseInfoHash = make_shared(); + deserializer.extract(&(*requestResponseInfoHash)[0], sibs::PUBSUB_KEY_LENGTH); + + bool userWantsCreateNode = deserializer.extract() == 1; + DatabaseFetchOrder fetchOrder = deserializer.extract(); + + if(userWantsCreateNode) + { + Log::debug("Request: Peer wants CreateNode"); + databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { - Log::debug("Request: Peer wants CreateNode"); - databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) - { - Log::debug("Request: Sent create packet to requesting peer"); - shared_ptr value = make_shared((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, *requestResponseInfoHash, value); - }); - } - - vector> userPublicKeys; + Log::debug("Request: Sent create packet to requesting peer"); + bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + }); + } + + vector> userPublicKeys; + + // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants + DataViewMap> actionGaps; + while(!deserializer.empty()) + { + unique_ptr userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); + deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); + u64 actionGapStart = deserializer.extract(); + u64 actionGapRange = deserializer.extract(); - // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants - DataViewMap> actionGaps; - while(!deserializer.empty()) + DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); + actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); + userPublicKeys.emplace_back(move(userPublicKeyRaw)); + } + + if(actionGaps.empty()) + Log::debug("No action gaps received, sending all data"); + + // TODO(Performance improvement): Instead of sending several packets, combine them into one + databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + { + bool sendData = false; + auto actionGapsIt = actionGaps.find(creatorPublicKey); + if(actionGapsIt == actionGaps.end()) { - unique_ptr userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); - deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); - u64 actionGapStart = deserializer.extract(); - u64 actionGapRange = deserializer.extract(); - - DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); - actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); - userPublicKeys.emplace_back(move(userPublicKeyRaw)); + Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); + sendData = true; } - - if(actionGaps.empty()) - Log::debug("No action gaps received, sending all data"); - - // TODO(Performance improvement): Instead of sending several packets, combine them into one - databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + else { - bool sendData = false; - auto actionGapsIt = actionGaps.find(creatorPublicKey); - if(actionGapsIt == actionGaps.end()) + for(const auto &userActionGaps : actionGapsIt->second) { - Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); - sendData = true; - } - else - { - for(const auto &userActionGaps : actionGapsIt->second) + if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) { - if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) - { - Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); - sendData = true; - break; - } + Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); + sendData = true; + break; } } - - if(!sendData) return; - shared_ptr value = make_shared((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, *requestResponseInfoHash, value); - this_thread::sleep_for(chrono::milliseconds(50)); - }, fetchOrder); - } - catch (std::exception &e) - { - Log::warn("Failed while serving peer, error: %s", e.what()); - } + } + + if(!sendData) return; + bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + this_thread::sleep_for(chrono::milliseconds(50)); + }, fetchOrder); + return true; } void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder) @@ -322,93 +281,59 @@ namespace odhtdb Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); - auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr value) + newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { + if(!peer) + return true; + Log::debug("Seed: New data listener received data..."); - const Hash requestHash(value->data.data(), value->data.size()); + const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) return true; - //return listenCreateData(value, requestHash, encryptionKey); else - return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); + return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); - newSeedInfo.newDataListenerFuture = make_shared>(move(newDataListenerFuture)); - u8 responseKey[OPENDHT_INFOHASH_LEN]; - randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN); - shared_ptr responseKeyShared = make_shared(responseKey, OPENDHT_INFOHASH_LEN);; + u8 responseKey[sibs::PUBSUB_KEY_LENGTH]; + randombytes_buf(responseKey, sibs::PUBSUB_KEY_LENGTH); + shared_ptr responseKeyShared = make_shared(responseKey, sibs::PUBSUB_KEY_LENGTH); newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. - auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr value) + newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - if(value->data.size() == OPENDHT_INFOHASH_LEN) - { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - InfoHash pingResponseKey; - deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN); - shared_ptr responseValue = make_shared(); - nodePutWithRetry(&node, pingResponseKey, responseValue); + if(!peer) return true; - } - - const Hash requestHash(value->data.data(), value->data.size()); + + const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) - return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey()); + return listenCreateData(data, size, requestHash, nodeToSeed.getNodeEncryptionKey()); else - return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); + return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); - newSeedInfo.responseKeyFuture = make_shared>(move(responseKeyFuture)); // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. - auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr value) + newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { + if(!peer) + return true; + try { - static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?"); - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - u16 requestStructureVersion = deserializer.extract(); - if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) - { - Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); - return true; - } - shared_ptr requestResponseInfoHash = make_shared(); - deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN); - if(*responseKeyShared == *requestResponseInfoHash) - { - Log::debug("Request: Ignorning request for old data from ourself"); - return true; - } - else - Log::debug("Request: Got request from somebody else"); - - u8 pingResponseKey[OPENDHT_INFOHASH_LEN]; - randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN); - InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN); - usize valueOffset = value->data.size() - deserializer.getSize(); - node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr _) - { - sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset); - return false; - }); - - shared_ptr pingRequest = make_shared(pingResponseKey, OPENDHT_INFOHASH_LEN); - nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest); + return sendOldDataToPeer(nodeToSeed, data, size); } catch (std::exception &e) { Log::warn("Failed while serving peer, error: %s", e.what()); + return true; } - return true; }); - newSeedInfo.requestOldDataListenerFuture = make_shared>(move(requestOldDataListenerFuture)); seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo; sibs::SafeSerializer serializer; serializer.add(DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); - serializer.add(responseKey, OPENDHT_INFOHASH_LEN); + serializer.add(responseKey, sibs::PUBSUB_KEY_LENGTH); bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash()); serializer.add(iHaveCreateNode ? (u8)0 : (u8)1); serializer.add(fetchOrder); @@ -440,8 +365,7 @@ namespace odhtdb } Log::debug("Sending request for old data"); - shared_ptr requestValue = make_shared(move(serializer.getBuffer())); - nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue); + bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } void Database::stopSeeding(const Hash &nodeHash) @@ -449,11 +373,10 @@ namespace odhtdb auto seedInfoIt = seedInfoMap.find(nodeHash); if(seedInfoIt != seedInfoMap.end()) { - // TODO: Verify if doing get on listener future stalls program forever... Opendht documentation is not clear on this DhtKey dhtKey(nodeHash); - node.cancelListen(dhtKey.getNewDataListenerKey(), seedInfoIt->second.newDataListenerFuture->get()); - node.cancelListen(dhtKey.getRequestOldDataKey(), seedInfoIt->second.requestOldDataListenerFuture->get()); - node.cancelListen(*seedInfoIt->second.reponseKeyInfoHash, seedInfoIt->second.responseKeyFuture->get()); + bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle); + bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle); + bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle); seedInfoMap.erase(seedInfoIt); } } @@ -488,9 +411,11 @@ namespace odhtdb databaseStorage.setNodeDecryptionKey(*hashRequestKey, DataView(encryptionKey->data, encryptionKey->size)); databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size()); + DatabaseNode databaseNode = { encryptionKey, hashRequestKey }; + seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); + DhtKey dhtKey(*hashRequestKey); - shared_ptr createDataValue = make_shared(move(serializer.getBuffer())); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); shared_ptr adminGroupIdResponse = make_shared(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -522,8 +447,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - shared_ptr addDataValue = make_shared((u8*)stagedAddObject.data, stagedAddObject.size); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -551,8 +475,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - shared_ptr addDataValue = make_shared((u8*)stagedAddObject.data, stagedAddObject.size); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() @@ -571,9 +494,9 @@ namespace odhtdb return timestamp; } - void Database::deserializeCreateRequest(const shared_ptr &value, const Hash &hash, const shared_ptr encryptionKey) + void Database::deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const shared_ptr encryptionKey) { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + sibs::SafeDeserializer deserializer((const u8*)data, size); u16 packetStructureVersion = deserializer.extract(); if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) { @@ -605,12 +528,12 @@ namespace odhtdb uint8_t adminGroupId[GROUP_ID_LENGTH]; deserializer.extract(adminGroupId, GROUP_ID_LENGTH); - databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, value->data.data(), value->data.size()); + databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, data, size); } - void Database::deserializeAddRequest(const shared_ptr &value, const Hash &requestDataHash, const std::shared_ptr &nodeHash, const shared_ptr encryptionKey) + void Database::deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr &nodeHash, const shared_ptr encryptionKey) { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + sibs::SafeDeserializer deserializer((const u8*)data, size); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); @@ -647,10 +570,10 @@ namespace odhtdb u64 newActionCounter = deserializerUnsigned.extract(); DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); - databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); + databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, data, size, additionalDataView); } - bool Database::listenCreateData(shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) + bool Database::listenCreateData(const void *data, const usize size, const Hash &hash, const shared_ptr encryptionKey) { Log::debug("Got create data in node %s", hash.toString().c_str()); try @@ -659,7 +582,7 @@ namespace odhtdb // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesNodeExist(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); - deserializeCreateRequest(value, hash, encryptionKey); + deserializeCreateRequest(data, size, hash, encryptionKey); } catch (exception &e) { @@ -668,7 +591,7 @@ namespace odhtdb return true; } - bool Database::listenAddData(shared_ptr value, const Hash &requestDataHash, const std::shared_ptr nodeHash, const shared_ptr encryptionKey) + bool Database::listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr nodeHash, const shared_ptr encryptionKey) { Log::debug("Got add data in node %s", nodeHash->toString().c_str()); try @@ -677,7 +600,7 @@ namespace odhtdb // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesDataExist(requestDataHash)) throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)"); - deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey); + deserializeAddRequest(data, size, requestDataHash, nodeHash, encryptionKey); } catch (RequestQuarantineException &e) { @@ -720,47 +643,44 @@ namespace odhtdb return databaseStorage.getUserLowestPermissionLevel(nodeHash, userPublicKey); } - std::future Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) + sibs::ListenHandle Database::receiveCustomMessage(const InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) { - dht::InfoHash responseKey = receiveMessageKey; + InfoHash responseKey = receiveMessageKey; ++responseKey[0]; - return node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr value) + return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size()); + sibs::SafeSerializer serializer = callbackFunc(data, size); if(!serializer.getBuffer().empty()) { - shared_ptr responseValue = make_shared(move(serializer.getBuffer())); - nodePutWithRetry(&node, responseKey, responseValue); + bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } return true; }); } - void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector &&data) + void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size) { - shared_ptr value = make_shared(move(data)); - nodePutWithRetry(&node, requestKey, value); + bootstrapConnection.put(requestKey.getKey(), data, size); } - std::future Database::sendCustomMessage(const dht::InfoHash &requestKey, vector &&data, SendCustomMessageCallbackFunc callbackFunc) + sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc) { - dht::InfoHash responseKey = requestKey; + InfoHash responseKey = requestKey; ++responseKey[0]; - auto listener = node.listen(responseKey, [callbackFunc](const shared_ptr value) + auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - return callbackFunc(true, value->data.data(), value->data.size()); + return callbackFunc(true, data, size); }); - shared_ptr value = make_shared(move(data)); - nodePutWithRetry(&node, requestKey, value); + bootstrapConnection.put(requestKey.getKey(), data, size); return listener; } - void Database::cancelNodeListener(const dht::InfoHash &infoHash, std::future &nodeListener) + void Database::cancelNodeListener(sibs::ListenHandle &nodeListener) { - node.cancelListen(infoHash, nodeListener.get()); + bootstrapConnection.cancelListen(nodeListener); } int Database::clearCache() @@ -768,8 +688,8 @@ namespace odhtdb return databaseStorage.clearCache(); } - dht::InfoHash Database::getInfoHash(const void *data, usize size) + InfoHash Database::getInfoHash(const void *data, usize size) { - return dht::InfoHash::get((const u8*)data, size); + return InfoHash::generateHash((const u8*)data, size); } } diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index 9398254..14b74e8 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -158,18 +158,6 @@ namespace odhtdb metadataSerializer.add(STORAGE_VERSION); randombytes_buf(passwordSalt, PASSWORD_SALT_LEN); metadataSerializer.add(passwordSalt, PASSWORD_SALT_LEN); - - identity = dht::crypto::generateIdentity(); - dht::Blob privateKeyData = identity.first->serialize(); - metadataSerializer.add((u16)privateKeyData.size()); - metadataSerializer.add(privateKeyData.data(), privateKeyData.size()); - - dht::Blob certificateData; - identity.second->pack(certificateData); - metadataSerializer.add((u16)certificateData.size()); - metadataSerializer.add(certificateData.data(), certificateData.size()); - - fileAppend(metadataFilePath, { metadataSerializer.getBuffer().data(), metadataSerializer.getBuffer().size() }); } catch(sibs::DeserializeException &e) { @@ -335,37 +323,13 @@ namespace odhtdb throw std::runtime_error("Wrong storage version!"); deserializer.extract(passwordSalt, PASSWORD_SALT_LEN); - - u16 privateKeySize = deserializer.extract(); - dht::Blob privateKeyRaw; - privateKeyRaw.resize(privateKeySize); - deserializer.extract(&privateKeyRaw[0], privateKeySize); - identity.first = make_shared(privateKeyRaw); - - u16 certificateSize = deserializer.extract(); - dht::Blob certificateRaw; - certificateRaw.resize(certificateSize); - deserializer.extract(&certificateRaw[0], certificateSize); - identity.second = make_shared(certificateRaw); - assert(deserializer.empty()); } void DatabaseStorage::loadRemoteNodesFromFile() { OwnedByteArray remoteNodesFileContent = fileGetContent(remoteNodesFilePath); - msgpack::unpacker pac; - pac.reserve_buffer(remoteNodesFileContent.size); - memcpy(pac.buffer(), remoteNodesFileContent.data, remoteNodesFileContent.size); - pac.buffer_consumed(remoteNodesFileContent.size); - - msgpack::object_handle oh; - while(pac.next(oh)) - { - auto importedNodes = oh.get().as>(); - remoteNodes.reserve(remoteNodes.size() + importedNodes.size()); - remoteNodes.insert(remoteNodes.end(), importedNodes.begin(), importedNodes.end()); - } + remotePeers = sibs::DirectConnectionsUtils::deserializePeers(remoteNodesFileContent.data, remoteNodesFileContent.size); } static void sqlite_step_throw_on_failure(sqlite3 *db, sqlite3_stmt *stmt, const char *description) @@ -1025,29 +989,17 @@ namespace odhtdb decryptNodeData(nodeHash, nodeDecryptionKeyResult.second); } - struct RemoteNodePacker + const vector>& DatabaseStorage::getRemotePeers() const { - sibs::SafeSerializer serializer; - - RemoteNodePacker& write(const char *data, size_t size) - { - serializer.add((const u8*)data, size); - return *this; - } - }; - - const vector& DatabaseStorage::getRemoteNodes() const - { - return remoteNodes; + return remotePeers; } - void DatabaseStorage::setRemoteNodes(const std::vector &remoteNodes) + void DatabaseStorage::setRemotePeers(const std::vector> &remotePeers) { - Log::debug("Storing %u remote nodes", remoteNodes.size()); - this->remoteNodes = remoteNodes; - RemoteNodePacker remoteNodePacker; - msgpack::pack(remoteNodePacker, remoteNodes); - fileOverwrite(remoteNodesFilePath, DataView(remoteNodePacker.serializer.getBuffer().data(), remoteNodePacker.serializer.getBuffer().size())); + Log::debug("Storing %u remote peers", remotePeers.size()); + this->remotePeers = remotePeers; + std::vector serializedPeers = sibs::DirectConnectionsUtils::serializePeers(remotePeers); + fileOverwrite(remoteNodesFilePath, DataView(serializedPeers.data(), serializedPeers.size())); } vector DatabaseStorage::getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const @@ -1292,11 +1244,6 @@ namespace odhtdb return true; } - const dht::crypto::Identity& DatabaseStorage::getIdentity() const - { - return identity; - } - void DatabaseStorage::update() { auto time = chrono::high_resolution_clock::now().time_since_epoch(); diff --git a/src/DhtKey.cpp b/src/DhtKey.cpp index 1508657..b84dbb7 100644 --- a/src/DhtKey.cpp +++ b/src/DhtKey.cpp @@ -8,24 +8,24 @@ namespace odhtdb firstByteOriginalValue = infoHash[0]; } - DhtKey::DhtKey(const dht::InfoHash &_infoHash) : infoHash(_infoHash) + DhtKey::DhtKey(const InfoHash &_infoHash) : infoHash(_infoHash) { firstByteOriginalValue = infoHash[0]; } - dht::InfoHash DhtKey::getNewDataListenerKey() + InfoHash DhtKey::getNewDataListenerKey() { infoHash[0] = firstByteOriginalValue; return infoHash; } - dht::InfoHash DhtKey::getRequestOldDataKey() + InfoHash DhtKey::getRequestOldDataKey() { infoHash[0] = firstByteOriginalValue + 1; return infoHash; } - dht::InfoHash DhtKey::getPingKey() + InfoHash DhtKey::getPingKey() { infoHash[0] = firstByteOriginalValue + 10; return infoHash; diff --git a/src/InfoHash.cpp b/src/InfoHash.cpp new file mode 100644 index 0000000..90e6e31 --- /dev/null +++ b/src/InfoHash.cpp @@ -0,0 +1,37 @@ +#include "../include/odhtdb/InfoHash.hpp" +#include +#include +#include + +namespace odhtdb +{ + InfoHash::InfoHash() + { + + } + + InfoHash::InfoHash(const u8 *data, const size_t size) : + key(data, size) + { + + } + + InfoHash InfoHash::generateHash(const u8 *data, const size_t size) + { + InfoHash infoHash; + int result = crypto_generichash_blake2b((unsigned char*)&infoHash.key.data[0], infoHash.key.data.size(), (const unsigned char*)data, size, nullptr, 0); + if(result < 0) + throw InfoHashException("Failed to hash data using blake2b"); + return infoHash; + } + + bool InfoHash::operator == (const InfoHash &other) const + { + return key == other.key; + } + + bool InfoHash::operator != (const InfoHash &other) const + { + return !(*this == other); + } +} \ No newline at end of file -- cgit v1.2.3