From 0e62cb8e5ed06d906ad84321cdda22acfcc952c9 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Fri, 9 Mar 2018 10:26:55 +0100 Subject: Partially implement 'add' operation --- src/Database.cpp | 247 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 139 insertions(+), 108 deletions(-) (limited to 'src/Database.cpp') diff --git a/src/Database.cpp b/src/Database.cpp index bcf5580..d01ff7d 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -30,8 +30,14 @@ const int OPENDHT_INFOHASH_LEN = 20; namespace odhtdb { - const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 0; - const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 0; + const u16 DATABASE_CREATE_PACKET_STRUCTURE_VERSION = 1; + const u16 DATABASE_ADD_PACKET_STRUCTURE_VERSION = 1; + + class RequestQuarantineException : public runtime_error + { + public: + RequestQuarantineException() : runtime_error("Request quarantine, will be processed later (can be real of fake request)") {} + }; DataView combine(sibs::SafeSerializer &headerSerializer, const Encryption &encryptedData) { @@ -43,12 +49,27 @@ namespace odhtdb return DataView(result, allocationSize); } - DatabaseCreateResponse::DatabaseCreateResponse(const shared_ptr &_key, const shared_ptr &_hash) : + DataView combine(const Signature::PublicKey &publicKey, const string &signedEncryptedData) + { + usize allocationSize = publicKey.getSize() + signedEncryptedData.size(); + char *result = new char[allocationSize]; + memcpy(result, publicKey.getData(), publicKey.getSize()); + memcpy(result + publicKey.getSize(), signedEncryptedData.data(), signedEncryptedData.size()); + return DataView(result, allocationSize); + } + + DatabaseCreateResponse::DatabaseCreateResponse(LocalUser *_nodeAdminUser, const shared_ptr &_key, const shared_ptr &_hash) : + nodeAdminUser(_nodeAdminUser), key(move(_key)), hash(_hash) { } + + const LocalUser* DatabaseCreateResponse::getNodeAdminUser() const + { + return nodeAdminUser; + } const shared_ptr DatabaseCreateResponse::getNodeEncryptionKey() const { @@ -133,13 +154,19 @@ namespace odhtdb node.listen(dhtKey.getNewDataListenerKey(), [this, hash, encryptionKey](const shared_ptr &value) { - return listenAddData(value, *hash, encryptionKey); + printf("Seed: New data listener received data...\n"); + const Hash requestHash(value->data.data(), value->data.size()); + if(requestHash == *hash) + return true; + //return listenCreateData(value, requestHash, encryptionKey); + else + return listenAddData(value, requestHash, encryptionKey); }); u8 responseKey[OPENDHT_INFOHASH_LEN]; randombytes_buf(responseKey, OPENDHT_INFOHASH_LEN); - // TODO: If this response key is spammed, generate a new one + // TODO: If this response key is spammed, generate a new one. node.listen(InfoHash(responseKey, OPENDHT_INFOHASH_LEN), [this, hash, encryptionKey](const shared_ptr &value) { const Hash requestHash(value->data.data(), value->data.size()); @@ -171,16 +198,20 @@ namespace odhtdb if(dataStartTimestamp == 0) { printf("Request: Sent create packet to requesting peer\n"); - node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value(requestedData->createData, requestedData->createDataSize), [](bool ok) + node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedData->data.data, requestedData->data.size), [](bool ok) { if(!ok) - fprintf(stderr, "Failed to put response for old data\n"); + fprintf(stderr, "Failed to put response for old data for 'create' data\n"); }); } - else + + for(auto requestedObject : requestedData->objects) { - assert(false); - printf("TODO: Send 'add' packets to requesting remote peer\n"); + node.put(InfoHash(requestResponseKey, OPENDHT_INFOHASH_LEN), Value((u8*)requestedObject->data.data, requestedObject->data.size), [](bool ok) + { + if(!ok) + fprintf(stderr, "Failed to put response for old data for 'add' data\n"); + }); } } catch (sibs::DeserializeException &e) @@ -203,39 +234,41 @@ namespace odhtdb //node.listen(ADD_DATA_HASH, bind(&Database::listenAddData, this, _1)); } - unique_ptr Database::create(const LocalUser *owner, const std::string &name) + unique_ptr Database::create(const std::string &ownerName, const std::string &nodeName) { + LocalUser *nodeAdminUser = LocalUser::create(Signature::KeyPair(), ownerName); + auto adminGroup = new Group("administrator"); + adminGroup->addUser(nodeAdminUser); + // Header sibs::SafeSerializer serializer; serializer.add(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); // Packet structure version // TODO: Append fractions to get real microseconds time u64 timestampMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; serializer.add(timestampMicroseconds); - serializer.add((u8*)owner->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); + serializer.add((u8*)nodeAdminUser->getPublicKey().getData(), PUBLIC_KEY_NUM_BYTES); // Encrypted body sibs::SafeSerializer encryptedSerializer; - assert(owner->getName().size() <= 255); - encryptedSerializer.add((u8)owner->getName().size()); - encryptedSerializer.add((u8*)owner->getName().data(), owner->getName().size()); - assert(name.size() <= 255); - encryptedSerializer.add((u8)name.size()); - encryptedSerializer.add((u8*)name.data(), name.size()); + assert(nodeAdminUser->getName().size() <= 255); + encryptedSerializer.add((u8)nodeAdminUser->getName().size()); + encryptedSerializer.add((u8*)nodeAdminUser->getName().data(), nodeAdminUser->getName().size()); + assert(nodeName.size() <= 255); + encryptedSerializer.add((u8)nodeName.size()); + encryptedSerializer.add((u8*)nodeName.data(), nodeName.size()); try { Encryption encryptedBody(DataView(encryptedSerializer.getBuffer().data(), encryptedSerializer.getBuffer().size())); DataView requestData = combine(serializer, encryptedBody); shared_ptr hashRequestKey = make_shared(requestData.data, requestData.size); - auto adminGroup = new Group("administrator"); - adminGroup->addUser(owner); databaseStorage.createStorage(*hashRequestKey, adminGroup, timestampMicroseconds, (const u8*)requestData.data, requestData.size); - stagedCreateObjects.emplace_back(make_unique(requestData, hashRequestKey)); + stagedCreateObjects.emplace_back(make_unique(requestData, hashRequestKey)); assert(encryptedBody.getKey().size == KEY_BYTE_SIZE); char *key = new char[encryptedBody.getKey().size]; memcpy(key, encryptedBody.getKey().data, encryptedBody.getKey().size); - return make_unique(make_shared(key), hashRequestKey); + return make_unique(nodeAdminUser, make_shared(key), hashRequestKey); } catch (EncryptionException &e) { @@ -243,17 +276,22 @@ namespace odhtdb } } - void Database::add(const LocalUser *owner, const Key &key, DataView data) + void Database::add(const unique_ptr &nodeInfo, DataView dataToAdd) { -#if 0 - if(nodeEncryptionKeys.find(key) == nodeEncryptionKeys.end()) - throw DatabaseAddException("Data for key needs to be created before data can be appended to it"); - - unique_ptr signedData = make_unique(owner->getPrivateKey().sign(data)); + sibs::SafeSerializer serializer; + serializer.add(DATABASE_ADD_PACKET_STRUCTURE_VERSION); // TODO: Append fractions to get real microseconds time - u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; - stagedAddObjects.emplace_back(StagedAddObject(key, move(signedData), timeMicroseconds, owner->getPublicKey())); -#endif + u64 timestampMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; + serializer.add(timestampMicroseconds); + + DataView encryptionKey(*nodeInfo->getNodeEncryptionKey(), KEY_BYTE_SIZE); + Encryption encryptedBody(dataToAdd, DataView(), encryptionKey); + DataView requestData = combine(serializer, encryptedBody); + string signedRequestData = nodeInfo->getNodeAdminUser()->getPrivateKey().sign(requestData); + free(requestData.data); + DataView stagedAddObject = combine(nodeInfo->getNodeAdminUser()->getPublicKey(), signedRequestData); + // TODO: Add add object to database storage here for local user + stagedAddObjects.emplace_back(make_unique(stagedAddObject, nodeInfo->getRequestHash())); } void Database::commit() @@ -268,6 +306,12 @@ namespace odhtdb { commitStagedCreateObject(stagedObject); } + + printf("Num objects to add: %zu\n", stagedAddObjects.size()); + for(const auto &stagedObject : stagedAddObjects) + { + commitStagedAddObject(stagedObject); + } } catch (exception &e) { @@ -276,24 +320,23 @@ namespace odhtdb for(const auto &stagedObject : stagedCreateObjects) { - free(stagedObject->encryptedBody.data); + free(stagedObject->data.data); } stagedCreateObjects.clear(); -#if 0 - printf("Num objects to add: %d\n", stagedAddObjects.size()); - for(StagedAddObject &stagedObject : stagedAddObjects) + + for(const auto &stagedObject : stagedAddObjects) { - commitStagedAddObject(stagedObject); + free(stagedObject->data.data); } stagedAddObjects.clear(); -#endif + // TODO: Add node.listen here to get notified when remote peers got the commit, then we can say we can return } - void Database::commitStagedCreateObject(const unique_ptr &stagedObject) + void Database::commitStagedCreateObject(const unique_ptr &stagedObject) { DhtKey dhtKey(*stagedObject->requestKey); - Value createDataValue((u8*)stagedObject->encryptedBody.data, stagedObject->encryptedBody.size); + Value createDataValue((u8*)stagedObject->data.data, stagedObject->data.size); node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok) { // TODO: Handle failure to put data @@ -302,48 +345,16 @@ namespace odhtdb }/* TODO: How to make this work?, time_point(), false*/); } - void Database::commitStagedAddObject(const DataView &stagedObject) + void Database::commitStagedAddObject(const unique_ptr &stagedObject) { -#if 0 - // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching) - sibs::SafeSerializer headerSerializer; - assert(stagedObject.key.hashedKey.size() == OPENDHT_INFOHASH_LEN); - headerSerializer.add(stagedObject.key.hashedKey.data(), OPENDHT_INFOHASH_LEN); - headerSerializer.add(stagedObject.timestamp); - - sibs::SafeSerializer bodySerializer; - bodySerializer.add((u8*)stagedObject.creatorPublicKey.getData(), PUBLIC_KEY_NUM_BYTES); - assert(stagedObject.data->size() < 0xFFFF - 120); - bodySerializer.add((u16)stagedObject.data->size()); - bodySerializer.add((u8*)stagedObject.data->data(), stagedObject.data->size()); - - EncryptedData encryptedData; - if(encrypt(&encryptedData, (EncryptionKey*)nodeEncryptionKeys[stagedObject.key], bodySerializer.getBuffer().data(), bodySerializer.getBuffer().size()) < 0) - throw CommitAddException("Failed to encrypt staged add object"); - - Blob serializedData; - combine(&serializedData, headerSerializer, encryptedData); - - // TODO: Verify if serializer buffer needs to survive longer than this scope - Value addDataValue(move(serializedData)); - node.put(ADD_DATA_HASH, move(addDataValue), [](bool ok) - { - // TODO: Handle failure to put data - if(!ok) - fprintf(stderr, "Failed to put for all: %s, what to do?\n", "commitStagedAddObject"); - }); - - // Post data for listeners of this key - /* - Value putKeyValue(serializer.getBuffer().data() + OPENDHT_INFOHASH_LEN, serializer.getBuffer().size() - OPENDHT_INFOHASH_LEN); - node.put(stagedObject.key.hashedKey, move(putKeyValue), [](bool ok) + DhtKey dhtKey(*stagedObject->requestKey); + Value createDataValue((u8*)stagedObject->data.data, stagedObject->data.size); + node.put(dhtKey.getNewDataListenerKey(), move(createDataValue), [](bool ok) { // TODO: Handle failure to put data if(!ok) - fprintf(stderr, "Failed to put for listeners: %s, what to do?\n", "commitStagedAddObject"); - }); - */ -#endif + fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject"); + }/* TODO: How to make this work?, time_point(), false*/); } ntp::NtpTimestamp Database::getSyncedTimestampUtc() const @@ -382,7 +393,7 @@ namespace odhtdb throw sibs::DeserializeException("Unsigned encrypted body is too small (unable to extract nonce)"); auto adminGroup = new Group("administrator"); - auto creatorUser = RemoteUser::create(userPublicKey, ""); // Username is encrypted, we dont know it... + auto creatorUser = RemoteUser::create(userPublicKey, "ENCRYPTED USER NAME"); // Username is encrypted, we dont know it... adminGroup->addUser(creatorUser); databaseStorage.createStorage(hash, adminGroup, creationDate, value->data.data(), value->data.size()); @@ -400,42 +411,61 @@ namespace odhtdb u8 nameLength = bodyDeserializer.extract(); string name; name.resize(nameLength); - bodyDeserializer.extract((u8*)&name[0], nameLength); + bodyDeserializer.extract((u8*)&name[0], nameLength); // TODO: Add this user name to storage added above return { creationDate, adminGroup, move(name) }; } DatabaseAddRequest Database::deserializeAddRequest(const std::shared_ptr &value, const Hash &hash, const shared_ptr encryptionKey) { - /* - StagedAddObject result; - sibs::SafeDeserializer deserializer(value->data.data(), value->data.size()); - u8 entryKeyRaw[OPENDHT_INFOHASH_LEN]; - deserializer.extract(entryKeyRaw, OPENDHT_INFOHASH_LEN); - result.key.hashedKey = InfoHash(entryKeyRaw, OPENDHT_INFOHASH_LEN); - result.timestamp = deserializer.extract(); - char creatorPublicKeyRaw[PUBLIC_KEY_NUM_BYTES]; deserializer.extract((u8*)creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); - Signature::PublicKey creatorPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); + Signature::PublicKey userPublicKey(creatorPublicKeyRaw, PUBLIC_KEY_NUM_BYTES); - u16 dataSize = deserializer.extract(); - if(dataSize < SIGNED_HASH_SIZE) - throw sibs::DeserializeException("Signed data is too small"); + DataView signedData((void*)deserializer.getBuffer(), deserializer.getSize()); + string unsignedData = userPublicKey.unsign(signedData); + sibs::SafeDeserializer deserializerUnsigned((u8*)unsignedData.data(), unsignedData.size()); - string signedData; - signedData.resize(dataSize); - deserializer.extract((u8*)&signedData[0], dataSize); - result.data = make_unique(); - result.data->resize(dataSize); - result.data = make_unique(creatorPublicKey.unsign(DataView((void*)signedData.data(), signedData.size()))); - - return result; - */ - Signature::PublicKey publicKey(nullptr, 0); - DataView d; - return { 0, 0, move(publicKey), d }; + u16 packetStructureVersion = deserializerUnsigned.extract(); + if(packetStructureVersion != DATABASE_CREATE_PACKET_STRUCTURE_VERSION) + { + string errMsg = "Received 'create' request with packet structure version "; + errMsg += to_string(packetStructureVersion); + errMsg += ", but our packet structure version is "; + errMsg += to_string(DATABASE_CREATE_PACKET_STRUCTURE_VERSION); + throw sibs::DeserializeException(errMsg); + } + + u64 creationDate = deserializerUnsigned.extract(); + // TODO: Append fractions to get real microseconds time + u64 timestampMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull; + if(creationDate > timestampMicroseconds) + throw sibs::DeserializeException("Packet is from the future"); + + if(deserializerUnsigned.getSize() < NONCE_BYTE_SIZE) + throw sibs::DeserializeException("Unsigned encrypted body is too small (unable to extract nonce)"); + + const Hash *node = databaseStorage.getNodeByUserPublicKey(userPublicKey); + if(!node) + { + // The user (public key) could belong to a node but we might not have retrieved the node info yet since data may + // not be retrieved in order. + // Data in quarantine is processed when 'create' packet is received or removed after 60 seconds + databaseStorage.addToQuarantine(userPublicKey, creationDate, value->data.data(), value->data.size()); + throw RequestQuarantineException(); + } + + auto creatorUser = databaseStorage.getUserByPublicKey(userPublicKey); + // TODO: Verify there isn't already data with same timestamp for this node. Same for quarantine + databaseStorage.appendStorage(*node, creatorUser, creationDate, value->data.data(), value->data.size()); + + u8 nonce[NONCE_BYTE_SIZE]; + deserializerUnsigned.extract(nonce, NONCE_BYTE_SIZE); + DataView dataToDecrypt((void*)deserializerUnsigned.getBuffer(), deserializerUnsigned.getSize()); + Decryption decryptedBody(dataToDecrypt, DataView(nonce, NONCE_BYTE_SIZE), DataView(*encryptionKey, KEY_BYTE_SIZE)); + + return { creationDate, creatorUser, move(decryptedBody) }; } bool Database::listenCreateData(std::shared_ptr value, const Hash &hash, const shared_ptr encryptionKey) @@ -460,11 +490,12 @@ namespace odhtdb printf("Got add data\n"); try { - if(databaseStorage.getStorage(hash)) - throw DatabaseStorageAlreadyExists("Add request hash is equal to hash already in storage (duplicate data?)"); - // TODO: Verify createObject timestamp is not in the future - //StagedAddObject addObject = deserializeAddRequest(value); - //DataView data((void*)addObject.data->data(), addObject.data->size()); + DatabaseAddRequest addObject = deserializeAddRequest(value, hash, encryptionKey); + printf("Got add object, timestamp: %zu\n", addObject.timestamp); + } + catch (RequestQuarantineException &e) + { + fprintf(stderr, "Warning: Request was put in quarantine, will be processed later"); } catch (exception &e) { -- cgit v1.2.3