@@ -289,13 +289,10 @@ bool ActiveStream::markDiskSnapshot(uint64_t startSeqno,
289289 }
290290 }
291291
292- /* We need to send the requested 'snap_start_seqno_' as the snapshot
292+ /* We may need to send the requested 'snap_start_seqno_' as the snapshot
293293 start when we are sending the first snapshot because the first
294294 snapshot could be resumption of a previous snapshot */
295- if (!firstMarkerSent) {
296- startSeqno = std::min (snap_start_seqno_, startSeqno);
297- firstMarkerSent = true ;
298- }
295+ startSeqno = adjustStartIfFirstSnapshot (startSeqno);
299296
300297 VBucketPtr vb = engine->getVBucket (vb_);
301298 if (!vb) {
@@ -1279,55 +1276,53 @@ void ActiveStream::processItems(
12791276 outstandingItemsResult.highCompletedSeqno ,
12801277 visibleSeqno,
12811278 highNonVisibleSeqno);
1282- } else if (!firstMarkerSent && isSeqnoAdvancedEnabled () &&
1283- lastReadSeqno < snap_end_seqno_) {
1284- // MB-47009: This first snapshot has been completely filtered away.
1285- // The remaining items must not of been for this client. We must
1286- // still send a snapshot marker so that the client is moved to their
1287- // end seqno - so a snapshot + seqno advance is needed.
1288- firstMarkerSent = true ;
1289-
1279+ } else if (isSeqnoAdvancedEnabled ()) {
12901280 // Note that we cannot enter this case if supportSyncReplication()
12911281 // returns true (see isSeqnoAdvancedEnabled). This means that we
12921282 // do not need to set the HCS/MVS or timestamp parameters of the
12931283 // snapshot marker. MB-47877 tracks enabling sync-writes+filtering
1294- pushToReadyQ (std::make_unique<SnapshotMarker>(opaque_,
1295- vb_,
1296- snap_start_seqno_,
1297- snap_end_seqno_,
1298- MARKER_FLAG_MEMORY,
1299- std::nullopt ,
1300- std::nullopt ,
1301- std::nullopt ,
1302- sid));
1303-
1304- lastSentSnapEndSeqno.store (snap_end_seqno_,
1305- std::memory_order_relaxed);
1306- nextSnapshotIsCheckpoint = false ;
1307-
1308- queueSeqnoAdvanced ();
1309- } else if (isSeqnoAdvancedEnabled () &&
1310- isSeqnoGapAtEndOfSnapshot (curChkSeqno)) {
1311- auto vb = engine->getVBucket (getVBucket ());
1312- if (vb) {
1313- if (vb->getState () == vbucket_state_replica) {
1314- /*
1315- * If this is a collection stream and we're not sending any
1316- * mutations from memory and we haven't queued a snapshot
1317- * shot and we're a replica. Then our snapshot covers
1318- * backfill and in memory. So we have one snapshot marker
1319- * for both items on disk and in memory. Thus, we need to
1320- * send a SeqnoAdvanced to push the consumer's seqno to the
1321- * end of the snapshot. This is need when no items for the
1322- * collection we're streaming are present in memory.
1323- */
1324- queueSeqnoAdvanced ();
1284+ if (!firstMarkerSent && lastReadSeqno < snap_end_seqno_) {
1285+ // MB-47009: This first snapshot has been completely filtered
1286+ // away. The remaining items must not of been for this client.
1287+ // We must still send a snapshot marker so that the client is
1288+ // moved to their end seqno - so a snapshot + seqno advance is
1289+ // needed.
1290+ sendSnapshotAndSeqnoAdvanced (
1291+ outstandingItemsResult.checkpointType ,
1292+ snap_start_seqno_,
1293+ snap_end_seqno_);
1294+ firstMarkerSent = true ;
1295+ } else if (isSeqnoGapAtEndOfSnapshot (curChkSeqno)) {
1296+ auto vb = engine->getVBucket (getVBucket ());
1297+ if (vb) {
1298+ if (vb->getState () == vbucket_state_replica) {
1299+ /*
1300+ * If this is a collection stream and we're not sending
1301+ * any mutations from memory and we haven't queued a
1302+ * snapshot and we're a replica. Then our snapshot
1303+ * covers backfill and in memory. So we have one
1304+ * snapshot marker for both items on disk and in memory.
1305+ * Thus, we need to send a SeqnoAdvanced to push the
1306+ * consumer's seqno to the end of the snapshot. This is
1307+ * needed when no items for the collection we're
1308+ * streaming are present in memory.
1309+ */
1310+ queueSeqnoAdvanced ();
1311+ }
1312+ } else {
1313+ log (spdlog::level::level_enum::warn,
1314+ " {} processItems() for vbucket which does not "
1315+ " exist" ,
1316+ logPrefix);
13251317 }
1326- } else {
1327- log (spdlog::level::level_enum::warn,
1328- " {} processItems() for vbucket which does not "
1329- " exist" ,
1330- logPrefix);
1318+ } else if (highNonVisibleSeqno &&
1319+ curChkSeqno >= highNonVisibleSeqno.value ()) {
1320+ // MB-48368: Nothing directly available for the stream, but a
1321+ // non-visible item was available - bring the client up-to-date
1322+ sendSnapshotAndSeqnoAdvanced (
1323+ outstandingItemsResult.checkpointType ,
1324+ highNonVisibleSeqno.value (),
1325+ highNonVisibleSeqno.value ());
13311326 }
13321327 }
13331328 }
@@ -2223,3 +2218,35 @@ bool ActiveStream::isSeqnoGapAtEndOfSnapshot(uint64_t streamSeqno) const {
22232218 return (lastSentSnapEndSeqno.load () > lastReadSeqno.load ()) &&
22242219 lastSentSnapEndSeqno.load () == streamSeqno;
22252220}
2221+
2222+ void ActiveStream::sendSnapshotAndSeqnoAdvanced (CheckpointType checkpointType,
2223+ uint64_t start,
2224+ uint64_t end) {
2225+ start = adjustStartIfFirstSnapshot (start);
2226+
2227+ const auto isCkptTypeDisk = checkpointType == CheckpointType::Disk;
2228+ uint32_t flags = isCkptTypeDisk ? MARKER_FLAG_DISK : MARKER_FLAG_MEMORY;
2229+
2230+ pushToReadyQ (std::make_unique<SnapshotMarker>(opaque_,
2231+ vb_,
2232+ start,
2233+ end,
2234+ flags,
2235+ std::nullopt ,
2236+ std::nullopt ,
2237+ std::nullopt ,
2238+ sid));
2239+
2240+ lastSentSnapEndSeqno.store (end, std::memory_order_relaxed);
2241+ nextSnapshotIsCheckpoint = false ;
2242+
2243+ queueSeqnoAdvanced ();
2244+ }
2245+
2246+ uint64_t ActiveStream::adjustStartIfFirstSnapshot (uint64_t start) {
2247+ if (!firstMarkerSent) {
2248+ firstMarkerSent = true ;
2249+ return std::min (snap_start_seqno_, start);
2250+ }
2251+ return start;
2252+ }
0 commit comments