Skip to content

Commit 2386b02

Browse files
authored
feat(server): Use new journal format (dragonflydb#563)
1 parent cdc31fc commit 2386b02

30 files changed

+302
-179
lines changed

src/facade/conn_context.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ class ConnectionContext {
5050
bool req_auth : 1;
5151
bool replica_conn : 1;
5252
bool authenticated : 1;
53-
bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber.
53+
bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber.
54+
bool journal_emulated : 1; // whether it is used to dispatch journal commands.
5455

5556
private:
5657
Connection* owner_;

src/facade/facade.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
118118
replica_conn = false;
119119
authenticated = false;
120120
force_dispatch = false;
121+
journal_emulated = false;
121122
}
122123

123124
RedisReplyBuilder* ConnectionContext::operator->() {

src/server/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc
1919
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
2020
set_family.cc stream_family.cc string_family.cc
2121
zset_family.cc version.cc bitops_family.cc container_utils.cc
22-
serializer_commons.cc journal/serializer.cc)
22+
serializer_commons.cc journal/serializer.cc journal/executor.cc)
2323

2424
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
2525
absl::random_random TRDP::jsoncons zstd TRDP::lz4)

src/server/common.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,10 @@ class Context : protected Cancellation {
268268
//
269269
// Note: this function blocks when called from inside an error handler.
270270
template <typename... T> GenericError Error(T... ts) {
271-
std::lock_guard lk{mu_};
271+
if (!mu_.try_lock()) // TODO: Maybe use two separate locks.
272+
return GenericError{std::forward<T>(ts)...};
273+
274+
std::lock_guard lk{mu_, std::adopt_lock};
272275
if (err_)
273276
return err_;
274277

src/server/dflycmd.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "server/engine_shard_set.h"
1414
#include "server/error.h"
1515
#include "server/journal/journal.h"
16+
#include "server/journal/serializer.h"
1617
#include "server/rdb_save.h"
1718
#include "server/script_mgr.h"
1819
#include "server/server_family.h"
@@ -353,16 +354,16 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) {
353354
// Register journal listener and cleanup.
354355
uint32_t cb_id = 0;
355356
if (shard != nullptr) {
356-
cb_id = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
357-
// TODO: Serialize event.
358-
ReqSerializer serializer{flow->conn->socket()};
359-
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
360-
});
357+
JournalWriter writer{flow->conn->socket()};
358+
auto journal_cb = [flow, writer = std::move(writer)](const journal::Entry& je) mutable {
359+
writer.Write(je);
360+
};
361+
cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb));
361362
}
362363

363364
flow->cleanup = [flow, this, cb_id]() {
364365
if (cb_id)
365-
sf_->journal()->Unregister(cb_id);
366+
sf_->journal()->UnregisterOnChange(cb_id);
366367
flow->TryShutdownSocket();
367368
};
368369

src/server/journal/executor.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2022, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#include "server/journal/executor.h"
6+
7+
#include "base/logging.h"
8+
#include "server/main_service.h"
9+
10+
namespace dfly {
11+
12+
JournalExecutor::JournalExecutor(Service* service) : service_{service} {
13+
}
14+
15+
void JournalExecutor::Execute(journal::ParsedEntry&& entry) {
16+
if (entry.payload) {
17+
io::NullSink null_sink;
18+
ConnectionContext conn_context{&null_sink, nullptr};
19+
conn_context.is_replicating = true;
20+
conn_context.journal_emulated = true;
21+
conn_context.conn_state.db_index = entry.dbid;
22+
23+
auto span = CmdArgList{entry.payload->data(), entry.payload->size()};
24+
25+
service_->DispatchCommand(span, &conn_context);
26+
}
27+
}
28+
29+
} // namespace dfly

src/server/journal/executor.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2022, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include "server/journal/types.h"
8+
9+
namespace dfly {
10+
11+
class Service;
12+
13+
// JournalExecutor allows executing journal entries.
14+
class JournalExecutor {
15+
public:
16+
JournalExecutor(Service* service);
17+
void Execute(journal::ParsedEntry&& entry);
18+
19+
private:
20+
Service* service_;
21+
};
22+
23+
} // namespace dfly

src/server/journal/journal.cc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,11 @@ error_code Journal::OpenInThread(bool persistent, string_view dir) {
4141
}
4242
}
4343

44-
ServerState::tlocal()->set_journal(this);
45-
EngineShard* shard = EngineShard::tlocal();
46-
if (shard) {
47-
shard->set_journal(this);
48-
}
44+
ServerState::tlocal()->set_journal(this);
45+
EngineShard* shard = EngineShard::tlocal();
46+
if (shard) {
47+
shard->set_journal(this);
48+
}
4949

5050
return ec;
5151
}
@@ -83,16 +83,16 @@ uint32_t Journal::RegisterOnChange(ChangeCallback cb) {
8383
return journal_slice.RegisterOnChange(cb);
8484
}
8585

86-
void Journal::Unregister(uint32_t id) {
87-
journal_slice.Unregister(id);
86+
void Journal::UnregisterOnChange(uint32_t id) {
87+
journal_slice.UnregisterOnChange(id);
8888
}
8989

9090
bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) {
9191
if (!journal_slice.IsOpen() || lameduck_.load(memory_order_relaxed))
9292
return false;
9393

94-
// TODO: to complete the metadata.
95-
journal_slice.AddLogRecord(Entry::Sched(txid));
94+
// TODO: Handle tx entries.
95+
// journal_slice.AddLogRecord(Entry::Sched(txid));
9696

9797
return true;
9898
}

src/server/journal/journal.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ class Transaction;
1313

1414
namespace journal {
1515

16-
1716
class Journal {
1817
public:
1918
using Span = absl::Span<const std::string_view>;
@@ -32,9 +31,8 @@ class Journal {
3231

3332
//******* The following functions must be called in the context of the owning shard *********//
3433

35-
3634
uint32_t RegisterOnChange(ChangeCallback cb);
37-
void Unregister(uint32_t id);
35+
void UnregisterOnChange(uint32_t id);
3836

3937
// Returns true if transaction was scheduled, false if journal is inactive
4038
// or in lameduck mode and does not log new transactions.
@@ -58,7 +56,6 @@ class Journal {
5856
void RecordEntry(const Entry& entry);
5957

6058
private:
61-
6259
mutable boost::fibers::mutex state_mu_;
6360

6461
std::atomic_bool lameduck_{false};

src/server/journal/journal_slice.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
147147
return id;
148148
}
149149

150-
void JournalSlice::Unregister(uint32_t id) {
150+
void JournalSlice::UnregisterOnChange(uint32_t id) {
151151
CHECK(!iterating_cb_arr_);
152152

153153
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),

0 commit comments

Comments
 (0)