@@ -1966,9 +1966,34 @@ void ActiveStream::completeBackfillInner(
19661966 // In-order backfills may require a seqno-advanced message if
19671967 // there is a stream filter present (e.g. only streaming a single
19681968 // collection).
1969- if (isSeqnoAdvancedNeededBackFill ()) {
1970- queueSeqnoAdvanced ();
1969+ if (isSeqnoAdvancedEnabled () &&
1970+ isSeqnoGapAtEndOfSnapshot (maxScanSeqno)) {
1971+ const auto vb = engine->getVBucket (vb_);
1972+ if (!vb) {
1973+ log (spdlog::level::level_enum::warn,
1974+ " {} ActiveStream::completeBackfillInner(): Vbucket "
1975+ " does not exist" ,
1976+ logPrefix);
1977+ }
1978+
1979+ // Not VB: Vbucket doens't exist anymore; We still need to send
1980+ // a SeqnoAdvance to bump the peer to end-of-snapshot.
1981+ // Active: We must send a SeqnoAdvanced to bump the DCP
1982+ // client's seqno to snap-end.
1983+ // Replica: Vbucket may transition backfill->memory without
1984+ // sending another snapshot. Thus, in this case we
1985+ // do not want to send a SeqnoAdvanced at the end
1986+ // of backfill. So check that we don't have an in
1987+ // memory range to stream from.
1988+ const auto replicaVucketSeqnoAdvance =
1989+ maxScanSeqno > lastBackfilledSeqno &&
1990+ maxScanSeqno == lastSentSnapEndSeqno;
1991+ if (!vb || vb->getState () != vbucket_state_replica ||
1992+ replicaVucketSeqnoAdvance) {
1993+ queueSeqnoAdvanced ();
1994+ }
19711995 }
1996+
19721997 // Client does not support collections, so we cannot send a
19731998 // seqno-advanced message to tell them that the last streamed seqno
19741999 // is below the snap_end. However, we should still move the
@@ -2503,39 +2528,6 @@ bool ActiveStream::isSeqnoAdvancedEnabled() const {
25032528 !flatBuffersSystemEventsEnabled);
25042529}
25052530
2506- bool ActiveStream::isSeqnoAdvancedNeededBackFill () const {
2507- if (!isSeqnoAdvancedEnabled () || !isSeqnoGapAtEndOfSnapshot (maxScanSeqno)) {
2508- return false ;
2509- }
2510- /* *
2511- * In most cases we want to send a SeqnoAdvanced op if we have not sent
2512- * the final seqno in the snapshot at the end of backfill. However,
2513- * replica vbucket may transition their snapshot from backfill to
2514- * streaming from memory without sending another snapshot. Thus, in this
2515- * case we do not want to send a SeqnoAdvanced at the end of backfill.
2516- * So check that we don't have an in memory range to stream from.
2517- */
2518- const auto vb = engine->getVBucket (vb_);
2519-
2520- // Note: Early returns 'true' maintains the same logic as the parent commit,
2521- // no logic change
2522- if (!vb) {
2523- log (spdlog::level::level_enum::warn,
2524- " {} ActiveStream::isSeqnoAdvancedNeededBackFill() for vbucket "
2525- " which does not exist" ,
2526- logPrefix);
2527- return true ;
2528- }
2529-
2530- if (vb->getState () != vbucket_state_replica) {
2531- return true ;
2532- }
2533-
2534- // vbucket_state_replica
2535- return maxScanSeqno > lastBackfilledSeqno &&
2536- maxScanSeqno == lastSentSnapEndSeqno;
2537- }
2538-
25392531bool ActiveStream::isSeqnoGapAtEndOfSnapshot (uint64_t streamSeqno) const {
25402532 return (lastSentSnapEndSeqno.load () > lastReadSeqno.load ()) &&
25412533 lastSentSnapEndSeqno.load () == streamSeqno;
0 commit comments