Skip to content

Commit 7d508e6

Browse files
committed
MB-58961: AS::isCurrentSnapshotCompleted() takes VBucket&
Change-Id: I7a75333f51f05fe3067ccfc48688f994bd9f9546 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/199178 Reviewed-by: Vesko Karaganev <[email protected]> Well-Formed: Restriction Checker Tested-by: Paolo Cocchi <[email protected]>
1 parent a823cc6 commit 7d508e6

File tree

2 files changed

+36
-22
lines changed

2 files changed

+36
-22
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,7 +1474,8 @@ void ActiveStream::snapshot(const OutstandingItemsResult& meta,
14741474

14751475
lastReadSeqno.store(newLastReadSeqno);
14761476

1477-
if (isCurrentSnapshotCompleted()) {
1477+
const auto vb = engine->getVBucket(vb_);
1478+
if (vb && isCurrentSnapshotCompleted(*vb)) {
14781479
// Get OptionalSeqnos which for the items list types should have values
14791480
auto seqnoStart = items.front()->getBySeqno();
14801481
auto seqnoEnd = items.back()->getBySeqno();
@@ -2302,17 +2303,15 @@ uint64_t ActiveStream::getLastSentSeqno() const {
23022303
return lastSentSeqno.load();
23032304
}
23042305

2305-
bool ActiveStream::isCurrentSnapshotCompleted() const {
2306-
VBucketPtr vbucket = engine->getVBucket(vb_);
2307-
// An atomic read of vbucket state without acquiring the
2308-
// reader lock for state should suffice here.
2309-
if (vbucket && vbucket->getState() == vbucket_state_replica) {
2310-
if (lastSentSnapEndSeqno.load(std::memory_order_relaxed) >=
2311-
lastReadSeqno) {
2312-
return false;
2313-
}
2306+
bool ActiveStream::isCurrentSnapshotCompleted(const VBucket& vb) const {
2307+
if (vb.getState() != vbucket_state_replica) {
2308+
return true;
23142309
}
2315-
return true;
2310+
// @todo MB-58961:
2311+
// 1. Shouldn't it be a weak-inequality here (ie, <=) ?
2312+
// 2. Shouldn't we use lastSentSeqno in place of lastReadSeqno here?
2313+
// At the time of writing I'm pushing a non-logic change, so defer the above
2314+
return lastSentSnapEndSeqno.load(std::memory_order_relaxed) < lastReadSeqno;
23162315
}
23172316

23182317
bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
@@ -2509,19 +2508,25 @@ bool ActiveStream::isSeqnoAdvancedNeededBackFill() const {
25092508
* case we do not want to send a SeqnoAdvanced at the end of backfill.
25102509
* So check that we don't have an in memory range to stream from.
25112510
*/
2512-
auto vb = engine->getVBucket(vb_);
2513-
if (vb) {
2514-
if (vb->getState() == vbucket_state_replica) {
2515-
return maxScanSeqno > lastBackfilledSeqno &&
2516-
maxScanSeqno == lastSentSnapEndSeqno.load();
2517-
}
2518-
} else {
2511+
const auto vb = engine->getVBucket(vb_);
2512+
2513+
// Note: Early returns 'true' maintains the same logic as the parent commit,
2514+
// no logic change
2515+
if (!vb) {
25192516
log(spdlog::level::level_enum::warn,
2520-
"{} isSeqnoAdvancedNeededBackFill() for vbucket which does not "
2521-
"exist",
2517+
"{} ActiveStream::isSeqnoAdvancedNeededBackFill() for vbucket "
2518+
"which does not exist",
25222519
logPrefix);
2520+
return true;
25232521
}
2524-
return isCurrentSnapshotCompleted();
2522+
2523+
if (vb->getState() != vbucket_state_replica) {
2524+
return true;
2525+
}
2526+
2527+
// vbucket_state_replica
2528+
return maxScanSeqno > lastBackfilledSeqno &&
2529+
maxScanSeqno == lastSentSnapEndSeqno;
25252530
}
25262531

25272532
bool ActiveStream::isSeqnoGapAtEndOfSnapshot(uint64_t streamSeqno) const {

engines/ep/src/dcp/active_stream.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,16 @@ class ActiveStream : public Stream,
711711
*/
712712
void scheduleBackfill_UNLOCKED(DcpProducer& producer, bool reschedule);
713713

714-
bool isCurrentSnapshotCompleted() const;
714+
/**
715+
* ActiveStream is in a complete snapshot:
716+
* - Always, on active vbuckets
717+
* - If we have sent up to the last seqno in the last marker range, for
718+
* non-active vbuckets
719+
*
720+
* @param vb Reference to the underlying vbucket
721+
* @return Whether the stream is in a complete snapshot
722+
*/
723+
bool isCurrentSnapshotCompleted(const VBucket& vb) const;
715724

716725
/**
717726
* Drop the cursor registered with the checkpoint manager. Used during

0 commit comments

Comments
 (0)