Skip to content

Commit 08803e6

Browse files
authored
feat(server): New auto-journal types/read/write (dragonflydb#560)
1 parent 3d610ee commit 08803e6

File tree

10 files changed

+542
-62
lines changed

10 files changed

+542
-62
lines changed

src/facade/facade_types.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,23 @@
44

55
#pragma once
66

7-
#include <absl/types/span.h>
87
#include <absl/container/flat_hash_map.h>
8+
#include <absl/types/span.h>
9+
10+
#include <string>
911

1012
namespace facade {
1113

12-
enum class Protocol : uint8_t {
13-
MEMCACHE = 1,
14-
REDIS = 2
15-
};
14+
enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };
1615

1716
using MutableSlice = absl::Span<char>;
1817
using CmdArgList = absl::Span<MutableSlice>;
1918
using CmdArgVec = std::vector<MutableSlice>;
2019

20+
inline std::string_view ToSV(MutableSlice slice) {
21+
return std::string_view{slice.data(), slice.size()};
22+
}
23+
2124
struct CmdArgListFormatter {
2225
void operator()(std::string* out, MutableSlice arg) const {
2326
out->append(absl::StrCat("`", std::string_view(arg.data(), arg.size()), "`"));
@@ -66,7 +69,6 @@ constexpr inline unsigned long long operator""_KB(unsigned long long x) {
6669

6770
} // namespace facade
6871

69-
7072
namespace std {
7173
ostream& operator<<(ostream& os, facade::CmdArgList args);
7274

src/server/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
1818
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
1919
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
2020
set_family.cc stream_family.cc string_family.cc
21-
zset_family.cc version.cc bitops_family.cc container_utils.cc)
21+
zset_family.cc version.cc bitops_family.cc container_utils.cc
22+
serializer_commons.cc journal/serializer.cc)
2223

2324
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
2425
absl::random_random TRDP::jsoncons zstd TRDP::lz4)
@@ -40,9 +41,10 @@ cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
4041
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
4142
cxx_test(snapshot_test dragonfly_lib LABELS DFLY)
4243
cxx_test(json_family_test dfly_test_lib LABELS DFLY)
44+
cxx_test(journal_test dfly_test_lib LABELS DFLY)
4345

4446

4547
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
4648
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
47-
generic_family_test memcache_parser_test rdb_test
49+
generic_family_test memcache_parser_test rdb_test journal_test
4850
redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test)

src/server/journal/serializer.cc

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2022, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "server/journal/serializer.h"
6+
7+
#include "base/io_buf.h"
8+
#include "base/logging.h"
9+
#include "io/io.h"
10+
#include "server/common.h"
11+
#include "server/error.h"
12+
#include "server/journal/types.h"
13+
#include "server/main_service.h"
14+
#include "server/serializer_commons.h"
15+
#include "server/transaction.h"
16+
17+
using namespace std;
18+
19+
namespace dfly {
20+
21+
JournalWriter::JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid)
22+
: sink_{sink}, cur_dbid_{dbid} {
23+
}
24+
25+
error_code JournalWriter::Write(uint64_t v) {
26+
uint8_t buf[10];
27+
unsigned len = WritePackedUInt(v, buf);
28+
return sink_->Write(io::Bytes{buf, len});
29+
}
30+
31+
error_code JournalWriter::Write(std::string_view sv) {
32+
RETURN_ON_ERR(Write(sv.size()));
33+
return sink_->Write(io::Buffer(sv));
34+
}
35+
36+
error_code JournalWriter::Write(CmdArgList args) {
37+
RETURN_ON_ERR(Write(args.size()));
38+
for (auto v : args)
39+
RETURN_ON_ERR(Write(facade::ToSV(v)));
40+
41+
return std::error_code{};
42+
}
43+
44+
error_code JournalWriter::Write(std::pair<std::string_view, ArgSlice> args) {
45+
auto [cmd, tail_args] = args;
46+
47+
RETURN_ON_ERR(Write(1 + tail_args.size()));
48+
RETURN_ON_ERR(Write(cmd));
49+
for (auto v : tail_args)
50+
RETURN_ON_ERR(Write(v));
51+
52+
return std::error_code{};
53+
}
54+
55+
error_code JournalWriter::Write(std::monostate) {
56+
return std::error_code{};
57+
}
58+
59+
error_code JournalWriter::Write(const journal::EntryNew& entry) {
60+
// Check if entry has a new db index and we need to emit a SELECT entry.
61+
if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) {
62+
RETURN_ON_ERR(Write(journal::EntryNew{journal::Op::SELECT, entry.dbid}));
63+
cur_dbid_ = entry.dbid;
64+
}
65+
66+
RETURN_ON_ERR(Write(uint8_t(entry.opcode)));
67+
68+
switch (entry.opcode) {
69+
case journal::Op::SELECT:
70+
return Write(entry.dbid);
71+
case journal::Op::VAL:
72+
RETURN_ON_ERR(Write(entry.txid));
73+
return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload);
74+
default:
75+
break;
76+
};
77+
return std::error_code{};
78+
}
79+
80+
JournalReader::JournalReader(io::Source* source, DbIndex dbid)
81+
: source_{source}, buf_{}, dbid_{dbid} {
82+
}
83+
84+
template <typename UT> io::Result<UT> ReadPackedUIntTyped(io::Source* source) {
85+
uint64_t v;
86+
SET_OR_UNEXPECT(ReadPackedUInt(source), v);
87+
if (v > std::numeric_limits<UT>::max())
88+
return make_unexpected(make_error_code(errc::result_out_of_range));
89+
return static_cast<UT>(v);
90+
}
91+
92+
io::Result<uint8_t> JournalReader::ReadU8() {
93+
return ReadPackedUIntTyped<uint8_t>(source_);
94+
}
95+
96+
io::Result<uint16_t> JournalReader::ReadU16() {
97+
return ReadPackedUIntTyped<uint16_t>(source_);
98+
}
99+
100+
io::Result<uint64_t> JournalReader::ReadU64() {
101+
return ReadPackedUIntTyped<uint64_t>(source_);
102+
}
103+
104+
io::Result<size_t> JournalReader::ReadString() {
105+
size_t size = 0;
106+
SET_OR_UNEXPECT(ReadU64(), size);
107+
108+
buf_.EnsureCapacity(size);
109+
auto dest = buf_.AppendBuffer().first(size);
110+
uint64_t read = 0;
111+
SET_OR_UNEXPECT(source_->Read(dest), read);
112+
113+
buf_.CommitWrite(read);
114+
if (read != size)
115+
return make_unexpected(std::make_error_code(std::errc::message_size));
116+
117+
return size;
118+
}
119+
120+
std::error_code JournalReader::Read(CmdArgVec* vec) {
121+
buf_.ConsumeInput(buf_.InputBuffer().size());
122+
123+
size_t size = 0;
124+
SET_OR_RETURN(ReadU64(), size);
125+
126+
vec->resize(size);
127+
for (auto& span : *vec) {
128+
size_t len;
129+
SET_OR_RETURN(ReadString(), len);
130+
span = MutableSlice{nullptr, len};
131+
}
132+
133+
size_t offset = 0;
134+
for (auto& span : *vec) {
135+
size_t len = span.size();
136+
auto ptr = buf_.InputBuffer().subspan(offset).data();
137+
span = MutableSlice{reinterpret_cast<char*>(ptr), len};
138+
offset += len;
139+
}
140+
141+
return std::error_code{};
142+
}
143+
144+
io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
145+
uint8_t opcode;
146+
SET_OR_UNEXPECT(ReadU8(), opcode);
147+
148+
journal::ParsedEntry entry{static_cast<journal::Op>(opcode), dbid_};
149+
150+
switch (entry.opcode) {
151+
case journal::Op::VAL:
152+
SET_OR_UNEXPECT(ReadU64(), entry.txid);
153+
entry.payload = CmdArgVec{};
154+
if (auto ec = Read(&*entry.payload); ec)
155+
return make_unexpected(ec);
156+
break;
157+
case journal::Op::SELECT:
158+
SET_OR_UNEXPECT(ReadU16(), dbid_);
159+
return ReadEntry();
160+
default:
161+
break;
162+
};
163+
return entry;
164+
}
165+
166+
} // namespace dfly

src/server/journal/serializer.h

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2022, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <optional>
8+
#include <string>
9+
10+
#include "base/io_buf.h"
11+
#include "io/io.h"
12+
#include "server/common.h"
13+
#include "server/journal/types.h"
14+
15+
namespace dfly {
16+
17+
// JournalWriter serializes journal entries to a sink.
18+
// It automatically keeps track of the current database index.
19+
class JournalWriter {
20+
public:
21+
// Initialize with sink and optional start database index. If no start index is set,
22+
// a SELECT will be issued before the first entry.
23+
JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid = std::nullopt);
24+
25+
// Write single entry.
26+
std::error_code Write(const journal::EntryNew& entry);
27+
28+
private:
29+
std::error_code Write(uint64_t v); // Write packed unsigned integer.
30+
std::error_code Write(std::string_view sv); // Write string.
31+
std::error_code Write(CmdArgList args);
32+
std::error_code Write(std::pair<std::string_view, ArgSlice> args);
33+
34+
std::error_code Write(std::monostate); // Overload for empty std::variant
35+
36+
private:
37+
io::Sink* sink_;
38+
std::optional<DbIndex> cur_dbid_;
39+
};
40+
41+
// JournalReader allows deserializing journal entries from a source.
42+
// Like the writer, it automatically keeps track of the database index.
43+
struct JournalReader {
44+
public:
45+
// Initialize with source and start database index.
46+
JournalReader(io::Source* source, DbIndex dbid);
47+
48+
// Try reading entry from source.
49+
io::Result<journal::ParsedEntry> ReadEntry();
50+
51+
private:
52+
// TODO: Templated endian encoding to not repeat...?
53+
io::Result<uint8_t> ReadU8();
54+
io::Result<uint16_t> ReadU16();
55+
io::Result<uint64_t> ReadU64();
56+
57+
// Read string into internal buffer and return size.
58+
io::Result<size_t> ReadString();
59+
60+
// Read argument array into internal buffer and build slice.
61+
// TODO: Inline store span data inside buffer to avoid alloaction
62+
std::error_code Read(CmdArgVec* vec);
63+
64+
private:
65+
io::Source* source_;
66+
base::IoBuf buf_;
67+
DbIndex dbid_;
68+
};
69+
70+
} // namespace dfly

src/server/journal/types.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
//
44
#pragma once
55

6+
#include <string>
7+
#include <variant>
8+
69
#include "server/common.h"
710
#include "server/table.h"
811

@@ -16,6 +19,7 @@ enum class Op : uint8_t {
1619
LOCK_SHARD = 3,
1720
UNLOCK_SHARD = 4,
1821
SCHED = 5,
22+
SELECT = 6,
1923
VAL = 10,
2024
DEL,
2125
MSET,
@@ -44,6 +48,46 @@ struct Entry {
4448
uint64_t expire_ms = 0; // 0 means no expiry.
4549
};
4650

51+
struct EntryBase {
52+
TxId txid;
53+
Op opcode;
54+
DbIndex dbid;
55+
};
56+
57+
// This struct represents a single journal entry.
58+
// Those are either control instructions or commands.
59+
struct EntryNew : public EntryBase { // Called this "New" because I can't delete the old neither
60+
// replace it partially
61+
// Payload represents a non-owning view into a command executed on the shard.
62+
using Payload =
63+
std::variant<std::monostate, // No payload.
64+
CmdArgList, // Parts of a full command.
65+
std::pair<std::string_view, ArgSlice> // Command and its shard parts.
66+
>;
67+
68+
EntryNew(TxId txid, DbIndex dbid, Payload pl)
69+
: EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} {
70+
}
71+
72+
EntryNew(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} {
73+
}
74+
75+
Payload payload;
76+
};
77+
78+
struct ParsedEntry : public EntryBase {
79+
using Payload = std::optional<CmdArgVec>;
80+
81+
ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} {
82+
}
83+
84+
ParsedEntry(TxId txid, DbIndex dbid, Payload pl)
85+
: EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} {
86+
}
87+
88+
Payload payload;
89+
};
90+
4791
using ChangeCallback = std::function<void(const Entry&)>;
4892

4993
} // namespace journal

0 commit comments

Comments
 (0)