Skip to content

Commit 5c056be

Browse files
authored
feat(replication): Aggregate data until threshold before replication (#5308)
Aggregate data until threshold is reached before replication. Threshold can be set with `replication_dispatch_threshold` flag and default is set to 1500. For stalled data there is fiber that wakes up every `10ms` and sends data even if there is no threshold reached. Signed-off-by: mkaruza <[email protected]>
1 parent 2a26df6 commit 5c056be

File tree

6 files changed

+106
-32
lines changed

6 files changed

+106
-32
lines changed

src/server/dflycmd.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,8 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
658658
DCHECK(shard);
659659
DCHECK(flow->conn);
660660

661-
flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st, JournalStreamer::SendLsn::YES));
661+
flow->streamer.reset(
662+
new JournalStreamer(sf_->journal(), exec_st, JournalStreamer::SendLsn::YES, true));
662663
flow->streamer->Start(flow->conn->socket());
663664

664665
// Register cleanup.

src/server/journal/pending_buf.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,9 @@ class PendingBuf {
3636
if (bufs_.back().buf.size() == Buf::kMaxBufSize) {
3737
bufs_.emplace_back();
3838
}
39-
auto& fron_buf = bufs_.back();
40-
41-
fron_buf.mem_size += str.size();
42-
fron_buf.buf.push_back(std::move(str));
39+
auto& front_buf = bufs_.back();
40+
front_buf.mem_size += str.size();
41+
front_buf.buf.push_back(std::move(str));
4342
}
4443

4544
// should be called to get the next buffer for sending
@@ -51,6 +50,10 @@ class PendingBuf {
5150
return bufs_.front();
5251
}
5352

53+
size_t FrontBufSize() const {
54+
return bufs_.front().mem_size;
55+
}
56+
5457
// should be called when the buf from PrepareSendingBuf() method was sent
5558
void Pop() {
5659
DCHECK(bufs_.size() >= 2);

src/server/journal/streamer.cc

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ ABSL_FLAG(uint32_t, replication_stream_output_limit, 64_KB,
2424
ABSL_FLAG(uint32_t, migration_buckets_serialization_threshold, 100,
2525
"The Number of buckets to serialize on each iteration before yielding");
2626

27+
ABSL_FLAG(uint32_t, replication_dispatch_threshold, 1500,
28+
"Number of bytes to aggregate before replication");
29+
2730
namespace dfly {
2831
using namespace util;
2932
using namespace journal;
@@ -36,13 +39,18 @@ iovec IoVec(io::Bytes src) {
3639

3740
uint32_t replication_stream_output_limit_cached = 64_KB;
3841
uint32_t migration_buckets_serialization_threshold_cached = 100;
42+
uint32_t replication_dispatch_threshold = 1500;
43+
uint32_t stalled_writer_base_period_ms = 10;
3944

4045
} // namespace
4146

42-
JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn)
43-
: cntx_(cntx), journal_(journal), send_lsn_(send_lsn) {
47+
JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn,
48+
bool is_stable_sync)
49+
: cntx_(cntx), journal_(journal), is_stable_sync_(is_stable_sync), send_lsn_(send_lsn) {
4450
// cache the flag to avoid accessing it later.
4551
replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit);
52+
replication_dispatch_threshold = absl::GetFlag(FLAGS_replication_dispatch_threshold);
53+
last_async_write_time_ = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
4654
}
4755

4856
JournalStreamer::~JournalStreamer() {
@@ -75,34 +83,80 @@ void JournalStreamer::Start(util::FiberSocketBase* dest) {
7583
CHECK(dest_ == nullptr && dest != nullptr);
7684
dest_ = dest;
7785
journal_cb_id_ = journal_->RegisterOnChange(this);
86+
StartStalledDataWriterFiber();
7887
}
7988

8089
void JournalStreamer::Cancel() {
8190
VLOG(1) << "JournalStreamer::Cancel";
8291
waker_.notifyAll();
8392
journal_->UnregisterOnChange(journal_cb_id_);
84-
if (!cntx_->IsError()) {
85-
WaitForInflightToComplete();
86-
}
93+
StopStalledDataWriterFiber();
94+
WaitForInflightToComplete();
8795
}
8896

8997
size_t JournalStreamer::UsedBytes() const {
9098
return pending_buf_.Size();
9199
}
92100

93-
void JournalStreamer::AsyncWrite() {
94-
DCHECK(!pending_buf_.Empty());
101+
void JournalStreamer::Write(std::string str) {
102+
DCHECK(!str.empty());
103+
DVLOG(3) << "Writing " << str.size() << " bytes";
104+
105+
pending_buf_.Push(std::move(str));
106+
AsyncWrite(false);
107+
}
108+
109+
void JournalStreamer::StartStalledDataWriterFiber() {
110+
if (is_stable_sync_ && !stalled_data_writer_.IsJoinable()) {
111+
auto pb = fb2::ProactorBase::me();
112+
std::chrono::milliseconds period_us(stalled_writer_base_period_ms);
113+
stalled_data_writer_ = MakeFiber([this, index = pb->GetPoolIndex(), period_us]() mutable {
114+
ThisFiber::SetName(absl::StrCat("fiber_periodic_journal_writer_", index));
115+
this->StalledDataWriterFiber(period_us, &stalled_data_writer_done_);
116+
});
117+
}
118+
}
119+
120+
void JournalStreamer::StalledDataWriterFiber(std::chrono::milliseconds period_ms,
121+
util::fb2::Done* waiter) {
122+
while (cntx_->IsRunning()) {
123+
if (waiter->WaitFor(period_ms)) {
124+
if (!cntx_->IsRunning()) {
125+
return;
126+
}
127+
}
128+
129+
// We don't want to force async write to replicate if last data
130+
// was written recent. Data needs to be stalled for period_ms duration.
131+
if (!pending_buf_.Size() || in_flight_bytes_ > 0 ||
132+
((last_async_write_time_ + period_ms.count()) >
133+
(fb2::ProactorBase::GetMonotonicTimeNs() / 1000000))) {
134+
continue;
135+
}
95136

137+
AsyncWrite(true);
138+
}
139+
}
140+
141+
void JournalStreamer::AsyncWrite(bool force_send) {
142+
// Stable sync or RestoreStreamer replication can't write data until
143+
// previous AsyncWriter finished.
96144
if (in_flight_bytes_ > 0) {
97-
// We can not flush data while there are in flight requests because AsyncWrite
98-
// is not atomic. Therefore, we just aggregate.
145+
return;
146+
}
147+
148+
// Writing in stable sync and outside of fiber needs to check
149+
// threshold before writing data.
150+
if (is_stable_sync_ && !force_send &&
151+
pending_buf_.FrontBufSize() < replication_dispatch_threshold) {
99152
return;
100153
}
101154

102155
const auto& cur_buf = pending_buf_.PrepareSendingBuf();
103156

104157
in_flight_bytes_ = cur_buf.mem_size;
105158
total_sent_ += in_flight_bytes_;
159+
last_async_write_time_ = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
106160

107161
const auto v_size = cur_buf.buf.size();
108162
absl::InlinedVector<iovec, 8> v(v_size);
@@ -112,18 +166,8 @@ void JournalStreamer::AsyncWrite() {
112166
v[i] = IoVec(io::Bytes(uptr, cur_buf.buf[i].size()));
113167
}
114168

115-
dest_->AsyncWrite(v.data(), v.size(), [this, len = in_flight_bytes_](std::error_code ec) {
116-
OnCompletion(std::move(ec), len);
117-
});
118-
}
119-
120-
void JournalStreamer::Write(std::string str) {
121-
DCHECK(!str.empty());
122-
DVLOG(3) << "Writing " << str.size() << " bytes";
123-
124-
pending_buf_.Push(std::move(str));
125-
126-
AsyncWrite();
169+
dest_->AsyncWrite(v.data(), v.size(),
170+
[this, len = in_flight_bytes_](std::error_code ec) { OnCompletion(ec, len); });
127171
}
128172

129173
void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
@@ -136,7 +180,7 @@ void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
136180
if (ec) {
137181
cntx_->ReportError(ec);
138182
} else if (!pending_buf_.Empty()) {
139-
AsyncWrite();
183+
AsyncWrite(false);
140184
}
141185
}
142186

@@ -176,13 +220,22 @@ void JournalStreamer::WaitForInflightToComplete() {
176220
}
177221
}
178222

223+
void JournalStreamer::StopStalledDataWriterFiber() {
224+
if (is_stable_sync_ && stalled_data_writer_.IsJoinable()) {
225+
stalled_data_writer_done_.Notify();
226+
if (stalled_data_writer_.IsJoinable()) {
227+
stalled_data_writer_.Join();
228+
}
229+
}
230+
}
231+
179232
bool JournalStreamer::IsStalled() const {
180233
return pending_buf_.Size() >= replication_stream_output_limit_cached;
181234
}
182235

183236
RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
184237
ExecutionState* cntx)
185-
: JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO),
238+
: JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO, false),
186239
db_slice_(slice),
187240
my_slots_(std::move(slots)) {
188241
DCHECK(slice != nullptr);

src/server/journal/streamer.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ namespace dfly {
2121
class JournalStreamer : public journal::JournalConsumerInterface {
2222
public:
2323
enum class SendLsn { NO = 0, YES = 1 };
24-
JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn);
24+
JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn,
25+
bool is_stable_sync);
2526
virtual ~JournalStreamer();
2627

2728
// Self referential.
@@ -60,16 +61,26 @@ class JournalStreamer : public journal::JournalConsumerInterface {
6061
ExecutionState* cntx_;
6162

6263
private:
63-
void AsyncWrite();
64+
void AsyncWrite(bool force_send);
6465
void OnCompletion(std::error_code ec, size_t len);
6566

6667
bool IsStalled() const;
6768

6869
journal::Journal* journal_;
6970

71+
util::fb2::Fiber stalled_data_writer_;
72+
util::fb2::Done stalled_data_writer_done_;
73+
void StartStalledDataWriterFiber();
74+
void StopStalledDataWriterFiber();
75+
void StalledDataWriterFiber(std::chrono::milliseconds period_ms, util::fb2::Done* waiter);
76+
7077
PendingBuf pending_buf_;
7178

79+
// If we are replication in stable sync we can aggregate data before sending
80+
bool is_stable_sync_;
7281
size_t in_flight_bytes_ = 0, total_sent_ = 0;
82+
// Last time that send data in milliseconds
83+
uint64_t last_async_write_time_ = 0;
7384
time_t last_lsn_time_ = 0;
7485
LSN last_lsn_writen_ = 0;
7586
util::fb2::EventCount waker_;

tests/dragonfly/cluster_test.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,7 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
835835
assert re.match(r"MOVED \d+ localhost:1111", e.value.args[0])
836836

837837
await push_config(replica_config, [c_master_admin])
838-
await asyncio.sleep(0.5)
838+
await check_all_replicas_finished([c_replica], c_master)
839839
assert await c_master.execute_command("dbsize") == 0
840840
assert await c_replica.execute_command("dbsize") == 0
841841

@@ -941,7 +941,7 @@ async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceF
941941
"""
942942
await push_config(config, [c_master_admin, c_replica_admin])
943943

944-
await asyncio.sleep(0.5)
944+
await check_all_replicas_finished([c_replica], c_master)
945945

946946
assert await c_master.execute_command("dbsize") == (100_000 - slot_0_size)
947947
assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size)
@@ -1146,6 +1146,10 @@ async def test_random_keys():
11461146
assert await client.get(key) == "value"
11471147

11481148
await test_random_keys()
1149+
1150+
for i in range(3):
1151+
await check_all_replicas_finished([c_replicas[i]], c_masters_admin[i])
1152+
11491153
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))
11501154

11511155
# Make sure that getting a value from a replica works as well.

tests/dragonfly/replication_test.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3016,6 +3016,8 @@ async def test_replica_snapshot_with_big_values_while_seeding(df_factory: DflyIn
30163016
await seeder.stop(c_master)
30173017
await stream_task
30183018

3019+
await check_all_replicas_finished([c_replica], c_master)
3020+
30193021
# Check that everything is in sync
30203022
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master, c_replica]))
30213023
assert len(set(hashes)) == 1

0 commit comments

Comments
 (0)