@@ -109,9 +109,9 @@ void LogTcpSocketDiagnostics(util::FiberSocketBase* dest) {
109
109
110
110
} // namespace
111
111
112
- JournalStreamer::JournalStreamer (journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn,
113
- bool is_stable_sync )
114
- : cntx_(cntx), journal_(journal), is_stable_sync_(is_stable_sync), send_lsn_(send_lsn ) {
112
+ JournalStreamer::JournalStreamer (journal::Journal* journal, ExecutionState* cntx,
113
+ JournalStreamer::Config config )
114
+ : cntx_(cntx), journal_(journal), config_(config ) {
115
115
// cache the flag to avoid accessing it later.
116
116
replication_stream_output_limit_cached = absl::GetFlag (FLAGS_replication_stream_output_limit);
117
117
migration_buckets_sleep_usec_cached = absl::GetFlag (FLAGS_migration_buckets_sleep_usec);
@@ -136,7 +136,7 @@ void JournalStreamer::ConsumeJournalChange(const JournalChangeItem& item) {
136
136
time_t now = time (nullptr );
137
137
last_lsn_writen_ = item.journal_item .lsn ;
138
138
// TODO: to chain it to the previous Write call.
139
- if (send_lsn_ == SendLsn::YES && now - last_lsn_time_ > 3 ) {
139
+ if (config_. should_sent_lsn && now - last_lsn_time_ > 3 ) {
140
140
last_lsn_time_ = now;
141
141
io::StringSink sink;
142
142
JournalWriter writer (&sink);
@@ -148,14 +148,19 @@ void JournalStreamer::ConsumeJournalChange(const JournalChangeItem& item) {
148
148
void JournalStreamer::Start (util::FiberSocketBase* dest) {
149
149
CHECK (dest_ == nullptr && dest != nullptr );
150
150
dest_ = dest;
151
- journal_cb_id_ = journal_->RegisterOnChange (this );
151
+ // For partial sync we first catch up from journal replication buffer and only then register.
152
+ if (config_.start_partial_sync_at == 0 ) {
153
+ journal_cb_id_ = journal_->RegisterOnChange (this );
154
+ }
152
155
StartStalledDataWriterFiber ();
153
156
}
154
157
155
158
void JournalStreamer::Cancel () {
156
159
VLOG (1 ) << " JournalStreamer::Cancel " << cntx_->IsCancelled ();
157
160
waker_.notifyAll ();
158
- journal_->UnregisterOnChange (journal_cb_id_);
161
+ if (journal_cb_id_) {
162
+ journal_->UnregisterOnChange (journal_cb_id_);
163
+ }
159
164
StopStalledDataWriterFiber ();
160
165
WaitForInflightToComplete ();
161
166
}
@@ -182,7 +187,7 @@ void JournalStreamer::Write(std::string str) {
182
187
}
183
188
184
189
void JournalStreamer::StartStalledDataWriterFiber () {
185
- if (is_stable_sync_ && !stalled_data_writer_.IsJoinable ()) {
190
+ if (config_. init_from_stable_sync && !stalled_data_writer_.IsJoinable ()) {
186
191
auto pb = fb2::ProactorBase::me ();
187
192
std::chrono::milliseconds period_us (stalled_writer_base_period_ms);
188
193
stalled_data_writer_ = MakeFiber ([this , index = pb->GetPoolIndex (), period_us]() mutable {
@@ -192,8 +197,55 @@ void JournalStreamer::StartStalledDataWriterFiber() {
192
197
}
193
198
}
194
199
200
+ bool JournalStreamer::MaybePartialStreamLSNs () {
201
+ // Same algorithm as SwitchIncrementalFb. The only difference is that we don't sent
202
+ // the old LSN"s via a snapshot but rather as journal changes.
203
+ if (config_.start_partial_sync_at > 0 ) {
204
+ LSN lsn = config_.start_partial_sync_at ;
205
+ DCHECK_LE (lsn, journal_->GetLsn ()) << " The replica tried to sync from the future." ;
206
+
207
+ LOG (INFO) << " Starting partial sync from lsn: " << lsn;
208
+ // The replica sends the LSN of the next entry is wants to receive.
209
+ while (cntx_->IsRunning () && journal_->IsLSNInBuffer (lsn)) {
210
+ JournalChangeItem item;
211
+ item.journal_item .data = journal_->GetEntry (lsn);
212
+ item.journal_item .lsn = lsn;
213
+ ConsumeJournalChange (item);
214
+ lsn++;
215
+ }
216
+
217
+ if (!cntx_->IsRunning ()) {
218
+ return false ;
219
+ }
220
+
221
+ if (journal_->GetLsn () != lsn) {
222
+ // We stopped but we didn't manage to send the whole stream.
223
+ cntx_->ReportError (
224
+ std::make_error_code (errc::state_not_recoverable),
225
+ absl::StrCat (" Partial sync was unsuccessful because entry #" , lsn,
226
+ " was dropped from the buffer. Current lsn=" , journal_->GetLsn ()));
227
+ return false ;
228
+ }
229
+
230
+ // We are done, register back to the journal so we don't miss any changes
231
+ journal_cb_id_ = journal_->RegisterOnChange (this );
232
+
233
+ LOG (INFO) << " Last LSN sent in partial sync was " << (lsn - 1 );
234
+ // flush pending
235
+ if (pending_buf_.Size () != 0 ) {
236
+ AsyncWrite (true );
237
+ }
238
+ }
239
+ return true ;
240
+ }
241
+
195
242
void JournalStreamer::StalledDataWriterFiber (std::chrono::milliseconds period_ms,
196
243
util::fb2::Done* waiter) {
244
+ if (!MaybePartialStreamLSNs ()) {
245
+ // Either context got cancelled, or partial sync failed because the lsn's stalled.
246
+ return ;
247
+ }
248
+
197
249
while (cntx_->IsRunning ()) {
198
250
if (waiter->WaitFor (period_ms)) {
199
251
if (!cntx_->IsRunning ()) {
@@ -222,7 +274,7 @@ void JournalStreamer::AsyncWrite(bool force_send) {
222
274
223
275
// Writing in stable sync and outside of fiber needs to check
224
276
// threshold before writing data.
225
- if (is_stable_sync_ && !force_send &&
277
+ if (config_. init_from_stable_sync && !force_send &&
226
278
pending_buf_.FrontBufSize () < replication_dispatch_threshold) {
227
279
return ;
228
280
}
@@ -332,7 +384,7 @@ void JournalStreamer::WaitForInflightToComplete() {
332
384
}
333
385
334
386
void JournalStreamer::StopStalledDataWriterFiber () {
335
- if (is_stable_sync_ && stalled_data_writer_.IsJoinable ()) {
387
+ if (config_. init_from_stable_sync && stalled_data_writer_.IsJoinable ()) {
336
388
stalled_data_writer_done_.Notify ();
337
389
if (stalled_data_writer_.IsJoinable ()) {
338
390
stalled_data_writer_.Join ();
@@ -346,9 +398,7 @@ bool JournalStreamer::IsStalled() const {
346
398
347
399
RestoreStreamer::RestoreStreamer (DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
348
400
ExecutionState* cntx)
349
- : JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO, false ),
350
- db_slice_ (slice),
351
- my_slots_(std::move(slots)) {
401
+ : JournalStreamer(journal, cntx, {}), db_slice_(slice), my_slots_(std::move(slots)) {
352
402
DCHECK (slice != nullptr );
353
403
migration_buckets_serialization_threshold_cached =
354
404
absl::GetFlag (FLAGS_migration_buckets_serialization_threshold);
0 commit comments