Skip to content

Commit 424b513

Browse files
committed
MB-58961: Remove AS::maxScanSeqno
The member is set by DCPBackfill<type> instances for being used in the final AS::completeBackfill calls. Remove the member and just pass that information by arg onto the ::completeBackfill() call. No logic change. Change-Id: If1995392fa4f03bfdf850ac2a9aafd11e390a05a Reviewed-on: https://review.couchbase.org/c/kv_engine/+/199229 Well-Formed: Restriction Checker Reviewed-by: Trond Norbye <[email protected]> Tested-by: Paolo Cocchi <[email protected]>
1 parent 0291f5f commit 424b513

File tree

6 files changed

+39
-63
lines changed

6 files changed

+39
-63
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -565,19 +565,21 @@ bool ActiveStream::backfillReceived(std::unique_ptr<Item> itm,
565565
return true;
566566
}
567567

568-
void ActiveStream::completeBackfill(std::chrono::steady_clock::duration runtime,
568+
void ActiveStream::completeBackfill(uint64_t maxScanSeqno,
569+
std::chrono::steady_clock::duration runtime,
569570
size_t diskBytesRead) {
570-
// maxSeqno is not needed for InOrder completion
571571
completeBackfillInner(
572-
BackfillType::InOrder, 0 /*maxSeqno*/, runtime, diskBytesRead);
572+
BackfillType::InOrder, maxScanSeqno, runtime, diskBytesRead);
573573
}
574574

575575
void ActiveStream::completeOSOBackfill(
576-
uint64_t maxSeqno,
576+
uint64_t maxScanSeqno,
577577
std::chrono::steady_clock::duration runtime,
578578
size_t diskBytesRead) {
579-
completeBackfillInner(
580-
BackfillType::OutOfSequenceOrder, maxSeqno, runtime, diskBytesRead);
579+
completeBackfillInner(BackfillType::OutOfSequenceOrder,
580+
maxScanSeqno,
581+
runtime,
582+
diskBytesRead);
581583
firstMarkerSent = true;
582584
}
583585

@@ -1949,7 +1951,7 @@ bool ActiveStream::tryAndScheduleOSOBackfill(DcpProducer& producer,
19491951

19501952
void ActiveStream::completeBackfillInner(
19511953
BackfillType backfillType,
1952-
uint64_t maxSeqno,
1954+
uint64_t maxScanSeqno,
19531955
std::chrono::steady_clock::duration runtime,
19541956
size_t diskBytesRead) {
19551957
{
@@ -1963,21 +1965,20 @@ void ActiveStream::completeBackfillInner(
19631965
}
19641966

19651967
if (backfillType == BackfillType::InOrder) {
1968+
const auto vb = engine->getVBucket(vb_);
1969+
if (!vb) {
1970+
log(spdlog::level::level_enum::warn,
1971+
"{} ActiveStream::completeBackfillInner(): Vbucket "
1972+
"does not exist",
1973+
logPrefix);
1974+
return;
1975+
}
1976+
19661977
// In-order backfills may require a seqno-advanced message if
19671978
// there is a stream filter present (e.g. only streaming a single
19681979
// collection).
19691980
if (isSeqnoAdvancedEnabled() &&
19701981
isSeqnoGapAtEndOfSnapshot(maxScanSeqno)) {
1971-
const auto vb = engine->getVBucket(vb_);
1972-
if (!vb) {
1973-
log(spdlog::level::level_enum::warn,
1974-
"{} ActiveStream::completeBackfillInner(): Vbucket "
1975-
"does not exist",
1976-
logPrefix);
1977-
}
1978-
1979-
// Not VB: Vbucket doens't exist anymore; We still need to send
1980-
// a SeqnoAdvance to bump the peer to end-of-snapshot.
19811982
// Active: We must send a SeqnoAdvanced to bump the DCP
19821983
// client's seqno to snap-end.
19831984
// Replica: Vbucket may transition backfill->memory without
@@ -1988,7 +1989,7 @@ void ActiveStream::completeBackfillInner(
19881989
const auto replicaVucketSeqnoAdvance =
19891990
maxScanSeqno > lastBackfilledSeqno &&
19901991
maxScanSeqno == lastSentSnapEndSeqno;
1991-
if (!vb || vb->getState() != vbucket_state_replica ||
1992+
if (vb->getState() != vbucket_state_replica ||
19921993
replicaVucketSeqnoAdvance) {
19931994
queueSeqnoAdvanced();
19941995
}
@@ -2007,8 +2008,6 @@ void ActiveStream::completeBackfillInner(
20072008
if (!isCollectionEnabledStream() && maxScanSeqno > lastReadSeqno) {
20082009
lastReadSeqno.store(maxScanSeqno);
20092010
}
2010-
// reset last seqno seen by backfill
2011-
maxScanSeqno = 0;
20122011
}
20132012

20142013
if (isBackfilling()) {
@@ -2055,19 +2054,19 @@ void ActiveStream::completeBackfillInner(
20552054
logPrefix);
20562055
} else if (
20572056
producer->isOutOfOrderSnapshotsEnabledWithSeqnoAdvanced() &&
2058-
maxSeqno != lastBackfilledSeqno) {
2057+
maxScanSeqno != lastBackfilledSeqno) {
20592058
pushToReadyQ(std::make_unique<SeqnoAdvanced>(
2060-
opaque_, vb_, sid, maxSeqno));
2061-
lastSentSeqnoAdvance.store(maxSeqno);
2059+
opaque_, vb_, sid, maxScanSeqno));
2060+
lastSentSeqnoAdvance.store(maxScanSeqno);
20622061
}
20632062

20642063
// Now that the OSO backfill has ended, we can tweak
20652064
// lastReadSeqno so that it reflects the end of the snapshot
20662065
// we've just processed. This ensures any pending backfill which
20672066
// follows continues from maxSeqno and not the max seqno of the
20682067
// collection(s) in the OSO scan, which could be way less.
2069-
if (maxSeqno > lastReadSeqno) {
2070-
lastReadSeqno = maxSeqno;
2068+
if (maxScanSeqno > lastReadSeqno) {
2069+
lastReadSeqno = maxScanSeqno;
20712070
}
20722071

20732072
pushToReadyQ(std::make_unique<OSOSnapshot>(

engines/ep/src/dcp/active_stream.h

Lines changed: 9 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -241,20 +241,24 @@ class ActiveStream : public Stream,
241241
backfill_source_t backfill_source);
242242

243243
/**
244+
* @param maxScanSeqno the maximum seqno of the snapshot supplying the OSO
245+
* backfill. A SeqnoAdvanced maybe sent if the last backfilled
246+
* item is not the maxSeqno item
244247
* @param runtime The total runtime the backfill took, measured as active
245-
* time executing (i.e. total BackfillManagerTask::run() durations for
246-
* this backfill)
248+
* time executing (i.e. total BackfillManagerTask::run() durations
249+
* for this backfill)
247250
* @param diskBytesRead The total number of bytes read from disk during
248251
* this backfill.
249252
*/
250-
void completeBackfill(std::chrono::steady_clock::duration runtime,
253+
void completeBackfill(uint64_t maxScanSeqno,
254+
std::chrono::steady_clock::duration runtime,
251255
size_t diskBytesRead);
252256

253257
/**
254258
* Queues a single "Out of Seqno Order" marker with the 'end' flag
255259
* into the ready queue.
256260
*
257-
* @param maxSeqno the maximum seqno of the snapshot supplying the OSO
261+
* @param maxScanSeqno the maximum seqno of the snapshot supplying the OSO
258262
* backfill. A SeqnoAdvanced maybe sent if the last backfilled
259263
* item is not the maxSeqno item
260264
* @param runtime The total runtime the backfill took, measured as active
@@ -263,7 +267,7 @@ class ActiveStream : public Stream,
263267
* @param diskBytesRead The total number of bytes read from disk during
264268
* this backfill.
265269
*/
266-
void completeOSOBackfill(uint64_t maxSeqno,
270+
void completeOSOBackfill(uint64_t maxScanSeqno,
267271
std::chrono::steady_clock::duration runtime,
268272
size_t diskBytesRead);
269273

@@ -428,22 +432,6 @@ class ActiveStream : public Stream,
428432
*/
429433
bool isIgnoringPurgedTombstones() const;
430434

431-
/**
432-
* Used to set the last read seqno from a scan of the data store layer.
433-
* (This is for external use only and ensures that
434-
* maxScanSeqno is zero prior to being set).
435-
* @param seqno last read seqno during a data store layer scan
436-
*/
437-
void setBackfillScanLastRead(uint64_t seqno) {
438-
std::unique_lock<std::mutex> lh(streamMutex);
439-
/*
440-
* maxScanSeqno should only be modified once per
441-
* backfill
442-
*/
443-
Expects(maxScanSeqno == 0);
444-
maxScanSeqno = seqno;
445-
}
446-
447435
/**
448436
* Method to get the collections filter of the stream
449437
* @return the filter object
@@ -887,15 +875,6 @@ class ActiveStream : public Stream,
887875
//! Last snapshot end seqno sent to the DCP client
888876
std::atomic<uint64_t> lastSentSnapEndSeqno;
889877

890-
/*
891-
* This is the highest seqno seen by KVStore when performing a scan
892-
* for the current snapshot we are back filling for. This is regardless of
893-
* collection or visibility. This used inform completeBackfill() of the last
894-
* read seqno from disk, to help make a decision on if we should enqueue an
895-
* SeqnoAdvanced op (see ::completeBackfill() for more info).
896-
*/
897-
uint64_t maxScanSeqno{0};
898-
899878
/* Flag used by checkpointCreatorTask that is set before all items are
900879
extracted for given checkpoint cursor, and is unset after all retrieved
901880
items are added to the readyQ */

engines/ep/src/dcp/backfill_by_seqno_disk.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ backfill_status_t DCPBackfillBySeqnoDisk::scan() {
193193
if (historyScan) {
194194
transitionState(backfill_state_scanning_history_snapshot);
195195
} else {
196-
stream->setBackfillScanLastRead(scanCtx->lastReadSeqno);
197196
transitionState(backfill_state_completing);
198197
}
199198
return backfill_success;
@@ -234,7 +233,8 @@ void DCPBackfillBySeqnoDisk::complete(bool cancelled) {
234233
}
235234

236235
const auto diskBytesRead = scanCtx ? scanCtx->diskBytesRead : 0;
237-
stream->completeBackfill(runtime, diskBytesRead);
236+
const auto maxScanSeqno = scanCtx ? scanCtx->lastReadSeqno : 0;
237+
stream->completeBackfill(maxScanSeqno, runtime, diskBytesRead);
238238

239239
auto severity = cancelled ? spdlog::level::level_enum::info
240240
: spdlog::level::level_enum::debug;

engines/ep/src/dcp/backfill_disk.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,10 +390,10 @@ backfill_status_t DCPBackfillDisk::scanHistory() {
390390
Expects(kvstore);
391391
switch (kvstore->scanAllVersions(bySeqnoCtx)) {
392392
case scan_success:
393-
stream->setBackfillScanLastRead(bySeqnoCtx.lastReadSeqno);
394393
// Call complete and transition straight to done (via complete). This
395394
// avoids the sub-class (by_id or by_seq) complete function being called
396-
stream->completeBackfill(runtime, bySeqnoCtx.diskBytesRead);
395+
stream->completeBackfill(
396+
bySeqnoCtx.lastReadSeqno, runtime, bySeqnoCtx.diskBytesRead);
397397
transitionState(backfill_state_completing);
398398
transitionState(backfill_state_done);
399399
return backfill_success;

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,6 @@ backfill_status_t DCPBackfillMemoryBuffered::scan() {
324324
++rangeItr;
325325
}
326326

327-
stream->setBackfillScanLastRead(endSeqno);
328-
329327
/* Backfill has ran to completion */
330328
complete(false);
331329

@@ -350,7 +348,7 @@ void DCPBackfillMemoryBuffered::complete(bool cancelled) {
350348

351349
/* [EPHE TODO]: invalidate cursor sooner before it gets deleted */
352350

353-
stream->completeBackfill(runtime, 0);
351+
stream->completeBackfill(endSeqno, runtime, 0);
354352

355353
auto severity = cancelled ? spdlog::level::level_enum::info
356354
: spdlog::level::level_enum::debug;

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2063,7 +2063,7 @@ TEST_P(STParamPersistentBucketTest, test_mb22451) {
20632063
// flag to true
20642064
EXPECT_TRUE(mock_stream->public_getPendingBackfill())
20652065
<< "handleSlowStream should set pendingBackfill to True";
2066-
mock_stream->completeBackfill({}, {});
2066+
mock_stream->completeBackfill(0, {}, {});
20672067
EXPECT_FALSE(mock_stream->public_isBackfillTaskRunning())
20682068
<< "completeBackfill should set isBackfillTaskRunning to False";
20692069
EXPECT_TRUE(mock_stream->isBackfilling())

0 commit comments

Comments
 (0)