Skip to content

Commit ab51b91

Browse files
committed
[BP] MB-58961: Remove ActiveStream::lastReadSeqnoUnSnapshotted
That is set to the last seqno that a stream has got from checkpoints but still not pushed into the stream readyQ. Used for passing that information down to ActiveStream::snapshot(<items>) and setting AS::lastReadSeqno. That doesn't need to be a class member as that information is computed locally when the stream makes up a snapshot from checkpoint. Also, lastReadSeqnoUnSnapshotted represents a non-atomic information on the stream state, as that is updated for every item processed under streamLock in the middle of the execution of AS::nextCheckpointItemTask(). Change-Id: Iac36349920203e81ba9898137a902c2d1def2048 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/198886 Reviewed-by: Trond Norbye <[email protected]> Tested-by: Paolo Cocchi <[email protected]> Well-Formed: Restriction Checker
1 parent 4597def commit ab51b91

File tree

3 files changed

+14
-18
lines changed

3 files changed

+14
-18
lines changed

engines/ep/docs/stats.org

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -741,8 +741,6 @@ another colon. For example, if your client is named, =slave1=, the
741741
| last_sent_seqno | The last seqno sent by this stream |
742742
| last_sent_snap_end_seqno | The last snapshot end seqno sent by active stream |
743743
| last_read_seqno | The last seqno read by this stream from disk or memory|
744-
| last_read_seqno_unsnapshotted | The last sequence number queued from memory, but is |
745-
| | yet to be put in a snapshot |
746744
| ready_queue_memory | Memory occupied by elements in the DCP readyQ |
747745
| memory_phase | The amount of items sent during the memory phase |
748746
| opaque | The unique stream identifier |

engines/ep/src/dcp/active_stream.cc

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
6666
includeValue(includeVal),
6767
includeXattributes(includeXattrs),
6868
includeDeletedUserXattrs(includeDeletedUserXattrs),
69-
lastReadSeqnoUnSnapshotted(st_seqno),
7069
lastSentSeqno(st_seqno),
7170
lastSentSeqnoAdvance(0),
7271
curChkSeqno(st_seqno),
@@ -846,8 +845,6 @@ void ActiveStream::addStats(const AddStatFn& add_stat, const CookieIface* c) {
846845
addStat("last_sent_snap_end_seqno",
847846
lastSentSnapEndSeqno.load(std::memory_order_relaxed));
848847
addStat("last_read_seqno", lastReadSeqno.load());
849-
addStat("last_read_seqno_unsnapshotted",
850-
lastReadSeqnoUnSnapshotted.load());
851848
addStat("ready_queue_memory", getReadyQueueMemory());
852849
addStat("backfill_buffer_bytes", bufferedBackfill.bytes.load());
853850
addStat("backfill_buffer_items", bufferedBackfill.items.load());
@@ -1317,6 +1314,7 @@ void ActiveStream::processItems(
13171314
* sent meaning the snapshot was never completed.
13181315
*/
13191316
std::optional<uint64_t> highNonVisibleSeqno;
1317+
uint64_t newLastReadSeqno = 0;
13201318
for (auto& qi : outstandingItemsResult.items) {
13211319
if (qi->getOperation() == queue_op::checkpoint_end) {
13221320
// At the end of each checkpoint remove its snapshot range, so
@@ -1337,7 +1335,8 @@ void ActiveStream::processItems(
13371335
snapshot(outstandingItemsResult,
13381336
mutations,
13391337
visibleSeqno,
1340-
highNonVisibleSeqno);
1338+
highNonVisibleSeqno,
1339+
newLastReadSeqno);
13411340
/* clear out all the mutations since they are already put
13421341
onto the readyQ */
13431342
mutations.clear();
@@ -1365,7 +1364,8 @@ void ActiveStream::processItems(
13651364
}
13661365

13671366
if (shouldProcessItem(*qi)) {
1368-
lastReadSeqnoUnSnapshotted = qi->getBySeqno();
1367+
newLastReadSeqno = qi->getBySeqno();
1368+
13691369
// Check if the item is allowed on the stream, note the filter
13701370
// updates itself for collection deletion events
13711371
if (filter.checkAndUpdate(*qi)) {
@@ -1387,7 +1387,8 @@ void ActiveStream::processItems(
13871387
snapshot(outstandingItemsResult,
13881388
mutations,
13891389
visibleSeqno,
1390-
highNonVisibleSeqno);
1390+
highNonVisibleSeqno,
1391+
newLastReadSeqno);
13911392
} else if (isSeqnoAdvancedEnabled()) {
13921393
// Note that we cannot enter this case if supportSyncReplication()
13931394
// returns true (see isSeqnoAdvancedEnabled). This means that we
@@ -1476,13 +1477,13 @@ bool ActiveStream::shouldProcessItem(const Item& item) {
14761477
void ActiveStream::snapshot(const OutstandingItemsResult& meta,
14771478
std::deque<std::unique_ptr<DcpResponse>>& items,
14781479
uint64_t maxVisibleSeqno,
1479-
std::optional<uint64_t> highNonVisibleSeqno) {
1480+
std::optional<uint64_t> highNonVisibleSeqno,
1481+
uint64_t newLastReadSeqno) {
14801482
if (items.empty()) {
14811483
return;
14821484
}
14831485

1484-
/* This assumes that all items in the "items deque" is put onto readyQ */
1485-
lastReadSeqno.store(lastReadSeqnoUnSnapshotted);
1486+
lastReadSeqno.store(newLastReadSeqno);
14861487

14871488
if (isCurrentSnapshotCompleted()) {
14881489
// Get OptionalSeqnos which for the items list types should have values

engines/ep/src/dcp/active_stream.h

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -632,8 +632,7 @@ class ActiveStream : public Stream,
632632
std::atomic<size_t> sent = 0;
633633
} backfillItems;
634634

635-
/* The last sequence number queued from disk or memory and is
636-
snapshotted and put onto readyQ */
635+
// The last sequence number queued into the readyQ from disk or memory
637636
AtomicMonotonic<uint64_t, ThrowExceptionPolicy> lastReadSeqno;
638637

639638
/* backfill ById or BySeqno updates this member during the scan, then
@@ -694,11 +693,13 @@ class ActiveStream : public Stream,
694693
* @param highNonVisibleSeqno the snapEnd seqno that includes any non
695694
* visible mutations i.e. prepares and aborts. This is only used when
696695
* collections is enabled and sync writes are not supported on the stream.
696+
* @param newLastReadSeqno The new lastReadSeqno, see member for details.
697697
*/
698698
void snapshot(const OutstandingItemsResult& meta,
699699
std::deque<std::unique_ptr<DcpResponse>>& items,
700700
uint64_t maxVisibleSeqno,
701-
std::optional<uint64_t> highNonVisibleSeqno);
701+
std::optional<uint64_t> highNonVisibleSeqno,
702+
uint64_t newLastReadSeqno);
702703

703704
void endStream(cb::mcbp::DcpStreamEndStatus reason);
704705

@@ -839,10 +840,6 @@ class ActiveStream : public Stream,
839840
//! Number of times a backfill is paused.
840841
cb::RelaxedAtomic<uint64_t> numBackfillPauses{0};
841842

842-
/* The last sequence number queued from memory, but is yet to be
843-
snapshotted and put onto readyQ */
844-
std::atomic<uint64_t> lastReadSeqnoUnSnapshotted;
845-
846843
//! The last sequence number sent to the network layer
847844
AtomicMonotonic<uint64_t, ThrowExceptionPolicy> lastSentSeqno;
848845

0 commit comments

Comments
 (0)