aboutsummaryrefslogtreecommitdiff
path: root/src/Database.cpp
blob: 33aef546b4c46a057546122a8222c5b156870009 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#include "../include/Database.hpp"
#include "../include/Group.hpp"
#include "../include/User.hpp"
#include <opendht.h>
#include <fmt/format.h>
#include <sodium/crypto_box_curve25519xchacha20poly1305.h>
#include <thread>
#include <chrono>
#include <sibs/SafeSerializer.hpp>

using namespace dht;
using namespace std;
using namespace chrono_literals;

static int databaseCount = 0;
// TODO: Verify time_t is always signed
static time_t timeOffset = 0; // Updated by comparing local time with ntp server
static thread *ntpThread = nullptr;
static bool timestampSynced = false;

namespace odhtdb
{
    Database::Database(const char *bootstrapNodeAddr, u16 port)
    {
        node.run(port, dht::crypto::generateIdentity(), true);
        fmt::MemoryWriter portStr;
        portStr << port;
        node.bootstrap(bootstrapNodeAddr, portStr.c_str());

        // TODO: Make this work for multiple threads initializing database at same time
        ++databaseCount;
        if(databaseCount == 1)
        {
            if(ntpThread)
                delete ntpThread;

            ntpThread = new thread([]()
            {
                ntp::NtpClient ntpClient("pool.ntp.org");
                while(databaseCount > 0)
                {
                    ntp::NtpTimestamp ntpTimestamp = ntpClient.getTimestamp();
                    timeOffset = time(nullptr) - ntpTimestamp.seconds;
                    timestampSynced = true;
                    // TODO: Also use timestamp fraction (milliseconds)
                    this_thread::sleep_for(60s);
                }
                timestampSynced = false;
            });

            // TODO: Catch std::system_error instead of this if-statement
            if(ntpThread->joinable())
                ntpThread->detach();
        }

        while(!timestampSynced)
        {
            this_thread::sleep_for(10ms);
        }
    }

    Database::~Database()
    {
        // TODO: Make this work for multiple threads removing database object at same time
        --databaseCount;
        node.join();
    }

    void Database::create(const Key &key, Group *primaryAdminGroup)
    {
        // TODO: Append fractions to get real microseconds time
        u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
        stagedCreateObjects.emplace_back(StagedCreateObject{ key, primaryAdminGroup, timeMicroseconds });
    }

    void Database::add(const Key &key, DataView data)
    {
        // TODO: Append fractions to get real microseconds time
        u64 timeMicroseconds = ((u64)getSyncedTimestampUtc().seconds) * 1000000ull;
        stagedAddObjects.emplace_back(StagedAddObject{ key, data, timeMicroseconds });
    }

    void Database::commit()
    {
        // TODO: Combine staged objects into one object for efficiency.
        // TODO: Add rollback

        for(StagedCreateObject &stagedObject : stagedCreateObjects)
        {
            commitStagedCreateObject(stagedObject);
        }
        stagedCreateObjects.clear();

        for(StagedAddObject &stagedObject : stagedAddObjects)
        {
            commitStagedAddObject(stagedObject);
        }
        stagedAddObjects.clear();
    }

    void Database::commitStagedCreateObject(const StagedCreateObject &stagedObject)
    {
        // TODO: Use (ed25519 or poly1305) and curve25519
        // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
        sibs::SafeSerializer serializer;
        serializer.add(stagedObject.key.hashedKey.data(), stagedObject.key.hashedKey.size());
        serializer.add((u8)stagedObject.primaryAdminGroup->getName().size());
        serializer.add((u8*)stagedObject.primaryAdminGroup->getName().data(), stagedObject.primaryAdminGroup->getName().size());
        serializer.add((u32)stagedObject.primaryAdminGroup->getUsers().size());
        for(User *user : stagedObject.primaryAdminGroup->getUsers())
        {
            serializer.add((u8)user->getName().size());
            serializer.add((u8*)user->getName().data(), user->getName().size());
        }
        
        // TODO: Verify if serializer buffer needs to survive longer than this method
        Value value(serializer.getBuffer().data(), serializer.getBuffer().size());
        node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
        {
            // TODO: Handle failure to put data
            if(!ok)
                fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedCreateObject");
        }, time_point(), false);
    }

    void Database::commitStagedAddObject(const StagedAddObject &stagedObject)
    {
        // TODO: Use (ed25519 or poly1305) and curve25519
        // TODO: Implement gas and price (refill when serving content (seeding) or by waiting. This is done to prevent spamming and bandwidth leeching)
        Value value((const u8*)stagedObject.data.data, stagedObject.data.size);
        node.put(stagedObject.key.hashedKey, move(value), [](bool ok)
        {
            // TODO: Handle failure to put data
            if(!ok)
                fprintf(stderr, "Failed to put: %s, what to do?\n", "commitStagedAddObject");
        }, time_point(), false);
    }

    ntp::NtpTimestamp Database::getSyncedTimestampUtc() const
    {
        assert(timestampSynced);
        ntp::NtpTimestamp timestamp;
        timestamp.seconds = time(nullptr) - timeOffset;
        timestamp.fractions = 0; // TODO: Set this
        return timestamp;
    }
}