Skip to content

Commit 2c8b37e

Browse files
committed
MB-58961: Remove AS::isCurrentSnapshotCompleted()
Used only at ActiveStream::snapshot(), can be replaced with a 1-liner. Change-Id: Ief86d2b1dfdcc228fc355ca467345057883ecf78 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/199179 Tested-by: Paolo Cocchi <[email protected]> Reviewed-by: Vesko Karaganev <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Trond Norbye <[email protected]>
1 parent 7d508e6 commit 2c8b37e

File tree

2 files changed

+19
-23
lines changed

2 files changed

+19
-23
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,8 +1474,26 @@ void ActiveStream::snapshot(const OutstandingItemsResult& meta,
14741474

14751475
lastReadSeqno.store(newLastReadSeqno);
14761476

1477+
// Note: ActiveStream is in a complete snapshot
1478+
// - Always, on active vbuckets
1479+
// - If we have sent up to the last seqno in the last marker range, for
1480+
// non-active vbuckets
1481+
//
1482+
// @todo MB-58961:
1483+
// 1. Shouldn't it be a weak-inequality here (ie, <=) ?
1484+
// 2. Shouldn't we use lastSentSeqno in place of lastReadSeqno here?
1485+
// At the time of writing I'm pushing a non-logic change, so defer the above
1486+
const auto isReplicaSnapshotComplete =
1487+
lastSentSnapEndSeqno.load(std::memory_order_relaxed) <
1488+
lastReadSeqno;
1489+
1490+
// Note: Here we consider "!replica" rather than "active", but I believe
1491+
// that streaming from "pending|dead" is just illegal, so we should change
1492+
// this.
1493+
14771494
const auto vb = engine->getVBucket(vb_);
1478-
if (vb && isCurrentSnapshotCompleted(*vb)) {
1495+
if (vb && (vb->getState() != vbucket_state_replica ||
1496+
isReplicaSnapshotComplete)) {
14791497
// Get OptionalSeqnos which for the items list types should have values
14801498
auto seqnoStart = items.front()->getBySeqno();
14811499
auto seqnoEnd = items.back()->getBySeqno();
@@ -2303,17 +2321,6 @@ uint64_t ActiveStream::getLastSentSeqno() const {
23032321
return lastSentSeqno.load();
23042322
}
23052323

2306-
bool ActiveStream::isCurrentSnapshotCompleted(const VBucket& vb) const {
2307-
if (vb.getState() != vbucket_state_replica) {
2308-
return true;
2309-
}
2310-
// @todo MB-58961:
2311-
// 1. Shouldn't it be a weak-inequality here (ie, <=) ?
2312-
// 2. Shouldn't we use lastSentSeqno in place of lastReadSeqno here?
2313-
// At the time of writing I'm pushing a non-logic change, so defer the above
2314-
return lastSentSnapEndSeqno.load(std::memory_order_relaxed) < lastReadSeqno;
2315-
}
2316-
23172324
bool ActiveStream::dropCheckpointCursor_UNLOCKED() {
23182325
VBucketPtr vbucket = engine->getVBucket(vb_);
23192326
if (!vbucket) {

engines/ep/src/dcp/active_stream.h

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -711,17 +711,6 @@ class ActiveStream : public Stream,
711711
*/
712712
void scheduleBackfill_UNLOCKED(DcpProducer& producer, bool reschedule);
713713

714-
/**
715-
* ActiveStream is in a complete snapshot:
716-
* - Always, on active vbuckets
717-
* - If we have sent up to the last seqno in the last marker range, for
718-
* non-active vbuckets
719-
*
720-
* @param vb Reference to the underlying vbucket
721-
* @return Whether the stream is in a complete snapshot
722-
*/
723-
bool isCurrentSnapshotCompleted(const VBucket& vb) const;
724-
725714
/**
726715
* Drop the cursor registered with the checkpoint manager. Used during
727716
* cursor dropping. Upon failure to drop the cursor, puts stream to

0 commit comments

Comments
 (0)