diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Database.cpp | 326 | ||||
-rw-r--r-- | src/DatabaseStorage.cpp | 69 | ||||
-rw-r--r-- | src/DhtKey.cpp | 8 | ||||
-rw-r--r-- | src/InfoHash.cpp | 37 |
4 files changed, 172 insertions, 268 deletions
diff --git a/src/Database.cpp b/src/Database.cpp index 564b3d6..2a7a510 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -5,7 +5,6 @@ #include "../include/odhtdb/bin2hex.hpp" #include "../include/odhtdb/Log.hpp" #include <boost/uuid/uuid_generators.hpp> -#include <opendht.h> #include <sodium/randombytes.h> #include <thread> #include <chrono> @@ -14,7 +13,6 @@ #include <cassert> #include <sys/time.h> -using namespace dht; using namespace std; using namespace chrono_literals; @@ -25,8 +23,6 @@ static odhtdb::u64 timeOffsetFraction = 0; static thread *ntpThread = nullptr; static bool timestampSynced = false; -const int OPENDHT_INFOHASH_LEN = 20; - namespace odhtdb { static boost::uuids::random_generator uuidGen; @@ -34,24 +30,6 @@ namespace odhtdb const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; const u16 DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION = 1; - const int NODE_PUT_RETRY_TIMES = 3; - - static void nodePutWithRetry(dht::DhtRunner *node, const dht::InfoHash &infoHash, shared_ptr<dht::Value> value, int retryCounter = 0) - { - node->put(infoHash, value, [node, infoHash, value, retryCounter](bool ok) - { - if(!ok) - { - if(retryCounter < NODE_PUT_RETRY_TIMES) - { - Log::warn("Failed to execute node.put, retrying (%d/%d)", 1 + retryCounter, NODE_PUT_RETRY_TIMES); - nodePutWithRetry(node, infoHash, value, retryCounter + 1); - } - else - Log::error("Failed to execute node.put with %d retries, stopping...", NODE_PUT_RETRY_TIMES); - } - }); - } class RequestQuarantineException : public runtime_error { @@ -108,34 +86,13 @@ namespace odhtdb } Database::Database(const char *bootstrapNodeAddr, u16 port, const boost::filesystem::path &storageDir, DatabaseCallbackFuncs callbackFuncs) : + bootstrapConnection(sibs::Ipv4(bootstrapNodeAddr, port)), databaseStorage(this, storageDir), onCreateNodeCallbackFunc(callbackFuncs.createNodeCallbackFunc), onAddNodeCallbackFunc(callbackFuncs.addNodeCallbackFunc), onAddUserCallbackFunc(callbackFuncs.addUserCallbackFunc), shuttingDown(false) { - node.run(port , { - /*.dht_config = */{ - /*.node_config = */{ - /*.node_id = */{}, - /*.network = */0, - /*.is_bootstrap = */false, - /*.maintain_storage*/false - }, - /*.id = */databaseStorage.getIdentity() - }, - /*.threaded = */true, - /*.proxy_server = */"", - /*.push_node_id = */"" - }); - node.setStorageLimit(1024 * 1024 * 1); // 1 Megabyte - auto portStr = to_string(port); - node.bootstrap(bootstrapNodeAddr, portStr.c_str()); - const auto &remoteNodes = databaseStorage.getRemoteNodes(); - if(!remoteNodes.empty()) - node.bootstrap(remoteNodes); - Log::debug("Connecting to bootstrap node (%s) and %u other known nodes that we have connected to previously with port %d", bootstrapNodeAddr, remoteNodes.size(), port); - // TODO: Make this work for multiple threads initializing database at same time ++databaseCount; if(databaseCount == 1) @@ -207,7 +164,8 @@ namespace odhtdb if(shuttingDown) return; } - databaseStorage.setRemoteNodes(node.exportNodes()); + + databaseStorage.setRemotePeers(bootstrapConnection.getPeers()); saveIntervalMs = 30000; // 30 sec } }); @@ -219,7 +177,6 @@ namespace odhtdb --databaseCount; shuttingDown = true; remoteNodesSaveThread.join(); - node.join(); } struct ActionGap @@ -228,78 +185,80 @@ namespace odhtdb u64 range; }; - void Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const shared_ptr<InfoHash> requestResponseInfoHash, const shared_ptr<Value> value, usize valueOffset) + bool Database::sendOldDataToPeer(const DatabaseNode nodeToSeed, const void *data, const usize size) { Log::debug("Request: Got request to send old data"); - try + + sibs::SafeDeserializer deserializer((const u8*)data, size); + u16 requestStructureVersion = deserializer.extract<u16>(); + if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) { - sibs::SafeDeserializer deserializer(value->data.data() + valueOffset, value->data.size() - valueOffset); - bool userWantsCreateNode = deserializer.extract<u8>() == 1; - DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>(); - - if(userWantsCreateNode) + Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); + return true; + } + shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>(); + deserializer.extract(&(*requestResponseInfoHash)[0], sibs::PUBSUB_KEY_LENGTH); + + bool userWantsCreateNode = deserializer.extract<u8>() == 1; + DatabaseFetchOrder fetchOrder = deserializer.extract<DatabaseFetchOrder>(); + + if(userWantsCreateNode) + { + Log::debug("Request: Peer wants CreateNode"); + databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) { - Log::debug("Request: Peer wants CreateNode"); - databaseStorage.fetchNodeRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash](const DataView rawData) - { - Log::debug("Request: Sent create packet to requesting peer"); - shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, *requestResponseInfoHash, value); - }); - } - - vector<unique_ptr<u8[]>> userPublicKeys; + Log::debug("Request: Sent create packet to requesting peer"); + bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + }); + } + + vector<unique_ptr<u8[]>> userPublicKeys; + + // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants + DataViewMap<vector<ActionGap>> actionGaps; + while(!deserializer.empty()) + { + unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); + deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); + u64 actionGapStart = deserializer.extract<u64>(); + u64 actionGapRange = deserializer.extract<u64>(); - // TODO(Performance improvement): Sort actions by gap start and do a binary search to check if raw data is the packet the peer wants - DataViewMap<vector<ActionGap>> actionGaps; - while(!deserializer.empty()) + DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); + actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); + userPublicKeys.emplace_back(move(userPublicKeyRaw)); + } + + if(actionGaps.empty()) + Log::debug("No action gaps received, sending all data"); + + // TODO(Performance improvement): Instead of sending several packets, combine them into one + databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + { + bool sendData = false; + auto actionGapsIt = actionGaps.find(creatorPublicKey); + if(actionGapsIt == actionGaps.end()) { - unique_ptr<u8[]> userPublicKeyRaw(new u8[PUBLIC_KEY_NUM_BYTES]); - deserializer.extract(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); - u64 actionGapStart = deserializer.extract<u64>(); - u64 actionGapRange = deserializer.extract<u64>(); - - DataView userPublicKey(userPublicKeyRaw.get(), PUBLIC_KEY_NUM_BYTES); - actionGaps[userPublicKey].push_back({ actionGapStart, actionGapRange }); - userPublicKeys.emplace_back(move(userPublicKeyRaw)); + Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); + sendData = true; } - - if(actionGaps.empty()) - Log::debug("No action gaps received, sending all data"); - - // TODO(Performance improvement): Instead of sending several packets, combine them into one - databaseStorage.fetchNodeAddDataRaw(*nodeToSeed.getRequestHash(), [this, requestResponseInfoHash, &actionGaps](const DataView rawData, const DataView creatorPublicKey, u64 actionCounter) + else { - bool sendData = false; - auto actionGapsIt = actionGaps.find(creatorPublicKey); - if(actionGapsIt == actionGaps.end()) + for(const auto &userActionGaps : actionGapsIt->second) { - Log::debug("No action gap received for user %s, sending data", bin2hex((const char*)creatorPublicKey.data, creatorPublicKey.size).c_str()); - sendData = true; - } - else - { - for(const auto &userActionGaps : actionGapsIt->second) + if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) { - if(actionCounter >= userActionGaps.start && actionCounter <= userActionGaps.start + userActionGaps.range) - { - Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); - sendData = true; - break; - } + Log::debug("Node action counter %llu is requested by peer (%llu - %llu)", actionCounter, userActionGaps.start, userActionGaps.start + userActionGaps.range); + sendData = true; + break; } } - - if(!sendData) return; - shared_ptr<Value> value = make_shared<Value>((u8*)rawData.data, rawData.size); - nodePutWithRetry(&node, *requestResponseInfoHash, value); - this_thread::sleep_for(chrono::milliseconds(50)); - }, fetchOrder); - } - catch (std::exception &e) - { - Log::warn("Failed while serving peer, error: %s", e.what()); - } + } + + if(!sendData) return; + bootstrapConnection.put(requestResponseInfoHash->getKey(), rawData.data, rawData.size); + this_thread::sleep_for(chrono::milliseconds(50)); + }, fetchOrder); + return true; } void Database::seed(const DatabaseNode &nodeToSeed, DatabaseFetchOrder fetchOrder) @@ -322,93 +281,59 @@ namespace odhtdb Log::debug("Seeding key: %s", nodeToSeed.getRequestHash()->toString().c_str()); DhtKey dhtKey(*nodeToSeed.getRequestHash()); - auto newDataListenerFuture = node.listen(dhtKey.getNewDataListenerKey(), [this, nodeToSeed](const shared_ptr<Value> value) + newSeedInfo.newDataListenHandle = bootstrapConnection.listen(dhtKey.getNewDataListenerKey().getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { + if(!peer) + return true; + Log::debug("Seed: New data listener received data..."); - const Hash requestHash(value->data.data(), value->data.size()); + const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) return true; - //return listenCreateData(value, requestHash, encryptionKey); else - return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); + return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); - newSeedInfo.newDataListenerFuture = make_shared<future<size_t>>(move(newDataListenerFuture)); - u8 responseKey[OPENDHT_INFOHASH_LEN]; - randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN); - shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, OPENDHT_INFOHASH_LEN);; + u8 responseKey[sibs::PUBSUB_KEY_LENGTH]; + randombytes_buf(responseKey, sibs::PUBSUB_KEY_LENGTH); + shared_ptr<InfoHash> responseKeyShared = make_shared<InfoHash>(responseKey, sibs::PUBSUB_KEY_LENGTH); newSeedInfo.reponseKeyInfoHash = responseKeyShared; // TODO: If this response key is spammed, generate a new one. - auto responseKeyFuture = node.listen(*responseKeyShared, [this, nodeToSeed](const shared_ptr<Value> value) + newSeedInfo.responseKeyListenHandle = bootstrapConnection.listen(responseKeyShared->getKey(), [this, nodeToSeed](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - if(value->data.size() == OPENDHT_INFOHASH_LEN) - { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - InfoHash pingResponseKey; - deserializer.extract(pingResponseKey.data(), OPENDHT_INFOHASH_LEN); - shared_ptr<Value> responseValue = make_shared<Value>(); - nodePutWithRetry(&node, pingResponseKey, responseValue); + if(!peer) return true; - } - - const Hash requestHash(value->data.data(), value->data.size()); + + const Hash requestHash(data, size); if(requestHash == *nodeToSeed.getRequestHash()) - return listenCreateData(value, requestHash, nodeToSeed.getNodeEncryptionKey()); + return listenCreateData(data, size, requestHash, nodeToSeed.getNodeEncryptionKey()); else - return listenAddData(value, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); + return listenAddData(data, size, requestHash, nodeToSeed.getRequestHash(), nodeToSeed.getNodeEncryptionKey()); }); - newSeedInfo.responseKeyFuture = make_shared<future<size_t>>(move(responseKeyFuture)); // TODO:!!! Before listening on this key, we should check how many remote peers are also providing this data. // This is to prevent too many peers from responding to a request to get old data. - auto requestOldDataListenerFuture = node.listen(dhtKey.getRequestOldDataKey(), [this, nodeToSeed, responseKeyShared](const shared_ptr<Value> value) + newSeedInfo.requestOldDataListenHandle = bootstrapConnection.listen(dhtKey.getRequestOldDataKey().getKey(), [this, nodeToSeed, responseKeyShared](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { + if(!peer) + return true; + try { - static_assert(HASH_LEN == OPENDHT_INFOHASH_LEN, "Wrong hashlen size, did it change with opendht upgrade?"); - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - u16 requestStructureVersion = deserializer.extract<u16>(); - if(requestStructureVersion != DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION) - { - Log::warn("Request: structure is version %d but we are at version %d, ignoring request", requestStructureVersion, DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); - return true; - } - shared_ptr<InfoHash> requestResponseInfoHash = make_shared<InfoHash>(); - deserializer.extract(requestResponseInfoHash->data(), OPENDHT_INFOHASH_LEN); - if(*responseKeyShared == *requestResponseInfoHash) - { - Log::debug("Request: Ignorning request for old data from ourself"); - return true; - } - else - Log::debug("Request: Got request from somebody else"); - - u8 pingResponseKey[OPENDHT_INFOHASH_LEN]; - randombytes_buf(pingResponseKey, OPENDHT_INFOHASH_LEN); - InfoHash pingResponseKeyInfoHash(pingResponseKey, OPENDHT_INFOHASH_LEN); - usize valueOffset = value->data.size() - deserializer.getSize(); - node.listen(pingResponseKeyInfoHash, [this, value, requestResponseInfoHash, nodeToSeed, valueOffset](const shared_ptr<Value> _) - { - sendOldDataToPeer(nodeToSeed, requestResponseInfoHash, value, valueOffset); - return false; - }); - - shared_ptr<Value> pingRequest = make_shared<Value>(pingResponseKey, OPENDHT_INFOHASH_LEN); - nodePutWithRetry(&node, *requestResponseInfoHash, pingRequest); + return sendOldDataToPeer(nodeToSeed, data, size); } catch (std::exception &e) { Log::warn("Failed while serving peer, error: %s", e.what()); + return true; } - return true; }); - newSeedInfo.requestOldDataListenerFuture = make_shared<future<size_t>>(move(requestOldDataListenerFuture)); seedInfoMap[*nodeToSeed.getRequestHash()] = newSeedInfo; sibs::SafeSerializer serializer; serializer.add(DATABASE_REQUEST_OLD_DATA_STRUCTURE_VERSION); - serializer.add(responseKey, OPENDHT_INFOHASH_LEN); + serializer.add(responseKey, sibs::PUBSUB_KEY_LENGTH); bool iHaveCreateNode = databaseStorage.doesNodeExist(*nodeToSeed.getRequestHash()); serializer.add(iHaveCreateNode ? (u8)0 : (u8)1); serializer.add(fetchOrder); @@ -440,8 +365,7 @@ namespace odhtdb } Log::debug("Sending request for old data"); - shared_ptr<Value> requestValue = make_shared<Value>(move(serializer.getBuffer())); - nodePutWithRetry(&node, dhtKey.getRequestOldDataKey(), requestValue); + bootstrapConnection.put(dhtKey.getRequestOldDataKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } void Database::stopSeeding(const Hash &nodeHash) @@ -449,11 +373,10 @@ namespace odhtdb auto seedInfoIt = seedInfoMap.find(nodeHash); if(seedInfoIt != seedInfoMap.end()) { - // TODO: Verify if doing get on listener future stalls program forever... Opendht documentation is not clear on this DhtKey dhtKey(nodeHash); - node.cancelListen(dhtKey.getNewDataListenerKey(), seedInfoIt->second.newDataListenerFuture->get()); - node.cancelListen(dhtKey.getRequestOldDataKey(), seedInfoIt->second.requestOldDataListenerFuture->get()); - node.cancelListen(*seedInfoIt->second.reponseKeyInfoHash, seedInfoIt->second.responseKeyFuture->get()); + bootstrapConnection.cancelListen(seedInfoIt->second.newDataListenHandle); + bootstrapConnection.cancelListen(seedInfoIt->second.requestOldDataListenHandle); + bootstrapConnection.cancelListen(seedInfoIt->second.responseKeyListenHandle); seedInfoMap.erase(seedInfoIt); } } @@ -488,9 +411,11 @@ namespace odhtdb databaseStorage.setNodeDecryptionKey(*hashRequestKey, DataView(encryptionKey->data, encryptionKey->size)); databaseStorage.createStorage(*hashRequestKey, creatorKeyPair->getPublicKey(), DataView(adminGroupId.data, adminGroupId.size()), timestampCombined, (const u8*)serializer.getBuffer().data(), serializer.getBuffer().size()); + DatabaseNode databaseNode = { encryptionKey, hashRequestKey }; + seed(databaseNode, DatabaseFetchOrder::OLDEST_FIRST); + DhtKey dhtKey(*hashRequestKey); - shared_ptr<Value> createDataValue = make_shared<Value>(move(serializer.getBuffer())); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), createDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); shared_ptr<OwnedByteArray> adminGroupIdResponse = make_shared<OwnedByteArray>(new u8[GROUP_ID_LENGTH], GROUP_ID_LENGTH); memcpy(adminGroupIdResponse->data, adminGroupId.data, GROUP_ID_LENGTH); @@ -522,8 +447,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_DATA, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, encryptedDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } void Database::addUser(const DatabaseNode &nodeInfo, const Signature::KeyPair &userToPerformActionWith, const Signature::PublicKey &userToAddPublicKey, const DataView &groupToAddUserTo) @@ -551,8 +475,7 @@ namespace odhtdb databaseStorage.appendStorage(*nodeInfo.getRequestHash(), requestDataHash, DatabaseOperation::ADD_USER, newActionCounter, userToPerformActionWith.getPublicKey(), timestampCombined, (u8*)stagedAddObject.data, stagedAddObject.size, additionalDataView); DhtKey dhtKey(*nodeInfo.getRequestHash()); - shared_ptr<Value> addDataValue = make_shared<Value>((u8*)stagedAddObject.data, stagedAddObject.size); - nodePutWithRetry(&node, dhtKey.getNewDataListenerKey(), addDataValue); + bootstrapConnection.put(dhtKey.getNewDataListenerKey().getKey(), stagedAddObject.data, stagedAddObject.size); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() @@ -571,9 +494,9 @@ namespace odhtdb return timestamp; } - void Database::deserializeCreateRequest(const shared_ptr<dht::Value> &value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) + void Database::deserializeCreateRequest(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + sibs::SafeDeserializer deserializer((const u8*)data, size); u16 packetStructureVersion = deserializer.extract<u16>(); if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) { @@ -605,12 +528,12 @@ namespace odhtdb uint8_t adminGroupId[GROUP_ID_LENGTH]; deserializer.extract(adminGroupId, GROUP_ID_LENGTH); - databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, value->data.data(), value->data.size()); + databaseStorage.createStorage(hash, userPublicKey, DataView(adminGroupId, GROUP_ID_LENGTH), creationDate, data, size); } - void Database::deserializeAddRequest(const shared_ptr<dht::Value> &value, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) + void Database::deserializeAddRequest(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> &nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) { - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); + sibs::SafeDeserializer deserializer((const u8*)data, size); char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); @@ -647,10 +570,10 @@ namespace odhtdb u64 newActionCounter = deserializerUnsigned.extract<u64>(); DataView additionalDataView((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); - databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, value->data.data(), value->data.size(), additionalDataView); + databaseStorage.appendStorage(*nodeHash, requestDataHash, operation, newActionCounter, creatorPublicKey, creationDate, data, size, additionalDataView); } - bool Database::listenCreateData(shared_ptr<dht::Value> value, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) + bool Database::listenCreateData(const void *data, const usize size, const Hash &hash, const shared_ptr<OwnedByteArray> encryptionKey) { Log::debug("Got create data in node %s", hash.toString().c_str()); try @@ -659,7 +582,7 @@ namespace odhtdb // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesNodeExist(hash)) throw DatabaseStorageAlreadyExists("Create request hash is equal to hash already in storage (duplicate data?)"); - deserializeCreateRequest(value, hash, encryptionKey); + deserializeCreateRequest(data, size, hash, encryptionKey); } catch (exception &e) { @@ -668,7 +591,7 @@ namespace odhtdb return true; } - bool Database::listenAddData(shared_ptr<dht::Value> value, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) + bool Database::listenAddData(const void *data, const usize size, const Hash &requestDataHash, const std::shared_ptr<Hash> nodeHash, const shared_ptr<OwnedByteArray> encryptionKey) { Log::debug("Got add data in node %s", nodeHash->toString().c_str()); try @@ -677,7 +600,7 @@ namespace odhtdb // the database has constraint to deal with this in multi-threaded way if(databaseStorage.doesDataExist(requestDataHash)) throw DatabaseStorageAlreadyExists("Add data request hash is equal to hash already in storage (duplicate data?)"); - deserializeAddRequest(value, requestDataHash, nodeHash, encryptionKey); + deserializeAddRequest(data, size, requestDataHash, nodeHash, encryptionKey); } catch (RequestQuarantineException &e) { @@ -720,47 +643,44 @@ namespace odhtdb return databaseStorage.getUserLowestPermissionLevel(nodeHash, userPublicKey); } - std::future<size_t> Database::receiveCustomMessage(const dht::InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) + sibs::ListenHandle Database::receiveCustomMessage(const InfoHash &receiveMessageKey, ReceiveCustomMessageCallbackFunc callbackFunc) { - dht::InfoHash responseKey = receiveMessageKey; + InfoHash responseKey = receiveMessageKey; ++responseKey[0]; - return node.listen(receiveMessageKey, [callbackFunc, this, responseKey](const shared_ptr<Value> value) + return bootstrapConnection.listen(receiveMessageKey.getKey(), [callbackFunc, this, responseKey](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - sibs::SafeSerializer serializer = callbackFunc(value->data.data(), value->data.size()); + sibs::SafeSerializer serializer = callbackFunc(data, size); if(!serializer.getBuffer().empty()) { - shared_ptr<Value> responseValue = make_shared<Value>(move(serializer.getBuffer())); - nodePutWithRetry(&node, responseKey, responseValue); + bootstrapConnection.put(responseKey.getKey(), serializer.getBuffer().data(), serializer.getBuffer().size()); } return true; }); } - void Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data) + void Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size) { - shared_ptr<Value> value = make_shared<Value>(move(data)); - nodePutWithRetry(&node, requestKey, value); + bootstrapConnection.put(requestKey.getKey(), data, size); } - std::future<size_t> Database::sendCustomMessage(const dht::InfoHash &requestKey, vector<u8> &&data, SendCustomMessageCallbackFunc callbackFunc) + sibs::ListenHandle Database::sendCustomMessage(const InfoHash &requestKey, const void *data, const usize size, SendCustomMessageCallbackFunc callbackFunc) { - dht::InfoHash responseKey = requestKey; + InfoHash responseKey = requestKey; ++responseKey[0]; - auto listener = node.listen(responseKey, [callbackFunc](const shared_ptr<Value> value) + auto listener = bootstrapConnection.listen(responseKey.getKey(), [callbackFunc](const sibs::DirectConnectionPeer *peer, const void *data, const usize size) { - return callbackFunc(true, value->data.data(), value->data.size()); + return callbackFunc(true, data, size); }); - shared_ptr<Value> value = make_shared<Value>(move(data)); - nodePutWithRetry(&node, requestKey, value); + bootstrapConnection.put(requestKey.getKey(), data, size); return listener; } - void Database::cancelNodeListener(const dht::InfoHash &infoHash, std::future<size_t> &nodeListener) + void Database::cancelNodeListener(sibs::ListenHandle &nodeListener) { - node.cancelListen(infoHash, nodeListener.get()); + bootstrapConnection.cancelListen(nodeListener); } int Database::clearCache() @@ -768,8 +688,8 @@ namespace odhtdb return databaseStorage.clearCache(); } - dht::InfoHash Database::getInfoHash(const void *data, usize size) + InfoHash Database::getInfoHash(const void *data, usize size) { - return dht::InfoHash::get((const u8*)data, size); + return InfoHash::generateHash((const u8*)data, size); } } diff --git a/src/DatabaseStorage.cpp b/src/DatabaseStorage.cpp index 9398254..14b74e8 100644 --- a/src/DatabaseStorage.cpp +++ b/src/DatabaseStorage.cpp @@ -158,18 +158,6 @@ namespace odhtdb metadataSerializer.add(STORAGE_VERSION); randombytes_buf(passwordSalt, PASSWORD_SALT_LEN); metadataSerializer.add(passwordSalt, PASSWORD_SALT_LEN); - - identity = dht::crypto::generateIdentity(); - dht::Blob privateKeyData = identity.first->serialize(); - metadataSerializer.add((u16)privateKeyData.size()); - metadataSerializer.add(privateKeyData.data(), privateKeyData.size()); - - dht::Blob certificateData; - identity.second->pack(certificateData); - metadataSerializer.add((u16)certificateData.size()); - metadataSerializer.add(certificateData.data(), certificateData.size()); - - fileAppend(metadataFilePath, { metadataSerializer.getBuffer().data(), metadataSerializer.getBuffer().size() }); } catch(sibs::DeserializeException &e) { @@ -335,37 +323,13 @@ namespace odhtdb throw std::runtime_error("Wrong storage version!"); deserializer.extract(passwordSalt, PASSWORD_SALT_LEN); - - u16 privateKeySize = deserializer.extract<u16>(); - dht::Blob privateKeyRaw; - privateKeyRaw.resize(privateKeySize); - deserializer.extract(&privateKeyRaw[0], privateKeySize); - identity.first = make_shared<dht::crypto::PrivateKey>(privateKeyRaw); - - u16 certificateSize = deserializer.extract<u16>(); - dht::Blob certificateRaw; - certificateRaw.resize(certificateSize); - deserializer.extract(&certificateRaw[0], certificateSize); - identity.second = make_shared<dht::crypto::Certificate>(certificateRaw); - assert(deserializer.empty()); } void DatabaseStorage::loadRemoteNodesFromFile() { OwnedByteArray remoteNodesFileContent = fileGetContent(remoteNodesFilePath); - msgpack::unpacker pac; - pac.reserve_buffer(remoteNodesFileContent.size); - memcpy(pac.buffer(), remoteNodesFileContent.data, remoteNodesFileContent.size); - pac.buffer_consumed(remoteNodesFileContent.size); - - msgpack::object_handle oh; - while(pac.next(oh)) - { - auto importedNodes = oh.get().as<vector<dht::NodeExport>>(); - remoteNodes.reserve(remoteNodes.size() + importedNodes.size()); - remoteNodes.insert(remoteNodes.end(), importedNodes.begin(), importedNodes.end()); - } + remotePeers = sibs::DirectConnectionsUtils::deserializePeers(remoteNodesFileContent.data, remoteNodesFileContent.size); } static void sqlite_step_throw_on_failure(sqlite3 *db, sqlite3_stmt *stmt, const char *description) @@ -1025,29 +989,17 @@ namespace odhtdb decryptNodeData(nodeHash, nodeDecryptionKeyResult.second); } - struct RemoteNodePacker + const vector<std::shared_ptr<sibs::DirectConnectionPeer>>& DatabaseStorage::getRemotePeers() const { - sibs::SafeSerializer serializer; - - RemoteNodePacker& write(const char *data, size_t size) - { - serializer.add((const u8*)data, size); - return *this; - } - }; - - const vector<dht::NodeExport>& DatabaseStorage::getRemoteNodes() const - { - return remoteNodes; + return remotePeers; } - void DatabaseStorage::setRemoteNodes(const std::vector<dht::NodeExport> &remoteNodes) + void DatabaseStorage::setRemotePeers(const std::vector<std::shared_ptr<sibs::DirectConnectionPeer>> &remotePeers) { - Log::debug("Storing %u remote nodes", remoteNodes.size()); - this->remoteNodes = remoteNodes; - RemoteNodePacker remoteNodePacker; - msgpack::pack(remoteNodePacker, remoteNodes); - fileOverwrite(remoteNodesFilePath, DataView(remoteNodePacker.serializer.getBuffer().data(), remoteNodePacker.serializer.getBuffer().size())); + Log::debug("Storing %u remote peers", remotePeers.size()); + this->remotePeers = remotePeers; + std::vector<u8> serializedPeers = sibs::DirectConnectionsUtils::serializePeers(remotePeers); + fileOverwrite(remoteNodesFilePath, DataView(serializedPeers.data(), serializedPeers.size())); } vector<OwnedByteArray> DatabaseStorage::getUserGroups(const Hash &nodeHash, const Signature::PublicKey &userPublicKey) const @@ -1292,11 +1244,6 @@ namespace odhtdb return true; } - const dht::crypto::Identity& DatabaseStorage::getIdentity() const - { - return identity; - } - void DatabaseStorage::update() { auto time = chrono::high_resolution_clock::now().time_since_epoch(); diff --git a/src/DhtKey.cpp b/src/DhtKey.cpp index 1508657..b84dbb7 100644 --- a/src/DhtKey.cpp +++ b/src/DhtKey.cpp @@ -8,24 +8,24 @@ namespace odhtdb firstByteOriginalValue = infoHash[0]; } - DhtKey::DhtKey(const dht::InfoHash &_infoHash) : infoHash(_infoHash) + DhtKey::DhtKey(const InfoHash &_infoHash) : infoHash(_infoHash) { firstByteOriginalValue = infoHash[0]; } - dht::InfoHash DhtKey::getNewDataListenerKey() + InfoHash DhtKey::getNewDataListenerKey() { infoHash[0] = firstByteOriginalValue; return infoHash; } - dht::InfoHash DhtKey::getRequestOldDataKey() + InfoHash DhtKey::getRequestOldDataKey() { infoHash[0] = firstByteOriginalValue + 1; return infoHash; } - dht::InfoHash DhtKey::getPingKey() + InfoHash DhtKey::getPingKey() { infoHash[0] = firstByteOriginalValue + 10; return infoHash; diff --git a/src/InfoHash.cpp b/src/InfoHash.cpp new file mode 100644 index 0000000..90e6e31 --- /dev/null +++ b/src/InfoHash.cpp @@ -0,0 +1,37 @@ +#include "../include/odhtdb/InfoHash.hpp" +#include <sodium/crypto_generichash_blake2b.h> +#include <sodium/core.h> +#include <algorithm> + +namespace odhtdb +{ + InfoHash::InfoHash() + { + + } + + InfoHash::InfoHash(const u8 *data, const size_t size) : + key(data, size) + { + + } + + InfoHash InfoHash::generateHash(const u8 *data, const size_t size) + { + InfoHash infoHash; + int result = crypto_generichash_blake2b((unsigned char*)&infoHash.key.data[0], infoHash.key.data.size(), (const unsigned char*)data, size, nullptr, 0); + if(result < 0) + throw InfoHashException("Failed to hash data using blake2b"); + return infoHash; + } + + bool InfoHash::operator == (const InfoHash &other) const + { + return key == other.key; + } + + bool InfoHash::operator != (const InfoHash &other) const + { + return !(*this == other); + } +}
\ No newline at end of file |