@@ -1444,49 +1444,61 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
14441444 }
14451445
14461446 uint64_t backfillStart = lastReadSeqno.load () + 1 ;
1447- uint64_t backfillEnd = 0 ;
1448- bool tryBackfill = false ;
1447+ uint64_t backfillEnd;
1448+ bool tryBackfill;
14491449
1450- if ((flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) || reschedule) {
1451- uint64_t vbHighSeqno = static_cast <uint64_t >(vbucket->getHighSeqno ());
1452- if (lastReadSeqno.load () > vbHighSeqno) {
1453- throw std::logic_error (" ActiveStream::scheduleBackfill_UNLOCKED: "
1454- " lastReadSeqno (which is " +
1455- std::to_string (lastReadSeqno.load ()) +
1456- " ) is greater than vbHighSeqno (which is " +
1457- std::to_string (vbHighSeqno) + " ). " +
1458- " for stream " + producer->logHeader () +
1459- " ; vb " + std::to_string (vb_));
1460- }
1461- if (reschedule) {
1462- /* We need to do this for reschedule because in case of
1463- DCP_ADD_STREAM_FLAG_DISKONLY (the else part), end_seqno_ is
1464- set to last persisted seqno befor calling
1465- scheduleBackfill_UNLOCKED() */
1466- backfillEnd = engine->getKVBucket ()->getLastPersistedSeqno (vb_);
1467- } else {
1468- backfillEnd = end_seqno_;
1469- }
1450+ if (flags_ & static_cast <uint64_t >(DCP_ADD_STREAM_FLAG_DISKONLY)) {
1451+ // if disk only, always backfill to the requested end seqno
1452+ backfillEnd = end_seqno_;
14701453 tryBackfill = true ;
14711454 } else {
1455+ /* not disk only - stream may require backfill but will transition to
1456+ * in-memory afterward; register the cursor now.
1457+ * There are two expected cases:
1458+ * 1: registerResult.second=true, which means
1459+ * - Cursor at start of first checkpoint
1460+ * - CheckpointManager can't provide all the items needed
1461+ * so a backfill may be required before moving to
1462+ * in-memory streaming.
1463+ * 2: registerResult.second=false
1464+ * - The CheckpointManager contains the required items
1465+ * - No backfill needed
1466+ */
1467+
1468+ CursorRegResult registerResult;
14721469 try {
1473- std::tie (curChkSeqno, tryBackfill) =
1474- vbucket->checkpointManager ->registerCursorBySeqno (
1475- cursorName,
1476- lastReadSeqno.load (),
1477- MustSendCheckpointEnd::NO);
1478- cursorRegistered = true ;
1479- } catch (std::exception& error) {
1470+ registerResult = vbucket->checkpointManager ->registerCursorBySeqno (
1471+ cursorName,
1472+ lastReadSeqno.load (),
1473+ MustSendCheckpointEnd::NO);
1474+ } catch (std::exception& error) {
14801475 log (EXTENSION_LOG_WARNING,
14811476 " (vb %" PRIu16
14821477 " ) Failed to register "
14831478 " cursor: %s" ,
14841479 vb_,
14851480 error.what ());
14861481 endStream (END_STREAM_STATE);
1482+ return ;
14871483 }
14881484
1485+ curChkSeqno = registerResult.first ;
1486+ tryBackfill = registerResult.second ;
1487+ cursorRegistered = true ;
1488+
1489+ log (EXTENSION_LOG_INFO,
1490+ " (vb %" PRIu16
1491+ " ) ActiveStream::scheduleBackfill_UNLOCKED register cursor "
1492+ " with "
1493+ " name \" {}\" backfill:{}, seqno:{}" ,
1494+ vb_,
1495+ name_.c_str (),
1496+ tryBackfill,
1497+ curChkSeqno.load ());
1498+
14891499 if (lastReadSeqno.load () > curChkSeqno) {
1500+ // something went wrong registering the cursor - it is too early
1501+ // and could read items this stream has already sent.
14901502 throw std::logic_error (" ActiveStream::scheduleBackfill_UNLOCKED: "
14911503 " lastReadSeqno (which is " +
14921504 std::to_string (lastReadSeqno.load ()) +
@@ -1496,23 +1508,18 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
14961508 " ; vb " + std::to_string (vb_));
14971509 }
14981510
1499- /* We need to find the minimum seqno that needs to be backfilled in
1500- * order to make sure that we don't miss anything when transitioning
1501- * to a memory snapshot. The backfill task will always make sure that
1502- * the backfill end seqno is contained in the backfill.
1503- */
1504- if (backfillStart < curChkSeqno) {
1505- if (curChkSeqno > end_seqno_) {
1506- /* Backfill only is enough */
1507- backfillEnd = end_seqno_;
1508- } else {
1509- /* Backfill + in-memory streaming */
1510- backfillEnd = curChkSeqno - 1 ;
1511- }
1512- }
1511+ // _if_ a backfill is required, it should end either at the
1512+ // requested stream end seqno OR the seqno immediately
1513+ // before what the checkpoint manager can provide
1514+ // - whichever is lower.
1515+ backfillEnd = std::min (end_seqno_, curChkSeqno - 1 );
15131516 }
15141517
1515- if (backfillStart <= backfillEnd && tryBackfill) {
1518+ if (tryBackfill &&
1519+ producer->scheduleBackfillManager (
1520+ *vbucket, shared_from_this (), backfillStart, backfillEnd)) {
1521+ // backfill will be needed to catch up to the items in the
1522+ // CheckpointManager
15161523 log (EXTENSION_LOG_NOTICE,
15171524 " (vb %" PRIu16
15181525 " ) Scheduling backfill "
@@ -1523,79 +1530,17 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
15231530 backfillStart,
15241531 backfillEnd,
15251532 reschedule ? " True" : " False" );
1526- producer->scheduleBackfillManager (
1527- *vbucket, shared_from_this (), backfillStart, backfillEnd);
1533+
15281534 isBackfillTaskRunning.store (true );
15291535 // / Number of backfill items is unknown until the Backfill task
15301536 // / completes the scan phase - reset backfillRemaining counter.
15311537 backfillRemaining.reset ();
15321538 } else {
1533- if (reschedule) {
1534- // Infrequent code path, see comment below.
1535- log (EXTENSION_LOG_NOTICE,
1536- " (vb %" PRIu16
1537- " ) Did not schedule "
1538- " backfill with reschedule : True, "
1539- " tryBackfill : True; "
1540- " backfillStart : %" PRIu64
1541- " , "
1542- " backfillEnd : %" PRIu64
1543- " , "
1544- " flags_ : %" PRIu32
1545- " , "
1546- " start_seqno_ : %" PRIu64
1547- " , "
1548- " end_seqno_ : %" PRIu64
1549- " , "
1550- " lastReadSeqno : %" PRIu64
1551- " , "
1552- " lastSentSeqno : %" PRIu64
1553- " , "
1554- " curChkSeqno : %" PRIu64
1555- " , "
1556- " itemsReady : %s" ,
1557- vb_,
1558- backfillStart,
1559- backfillEnd,
1560- flags_,
1561- start_seqno_,
1562- end_seqno_,
1563- lastReadSeqno.load (),
1564- lastSentSeqno.load (),
1565- curChkSeqno.load (),
1566- itemsReady ? " True" : " False" );
1567-
1568- /* Cursor was dropped, but we will not do backfill.
1569- * This may happen in a corner case where, the memory usage is high
1570- * due to other vbuckets and persistence cursor moves ahead of
1571- * replication cursor to new checkpoint open but does not persist
1572- * items yet.
1573- *
1574- * Because we dropped the cursor but did not do a backfill (and
1575- * therefore did not re-register a cursor in markDiskSnapshot) we
1576- * must re-register the cursor here.
1577- */
1578- try {
1579- CursorRegResult result =
1580- vbucket->checkpointManager ->registerCursorBySeqno (
1581- cursorName,
1582- lastReadSeqno.load (),
1583- MustSendCheckpointEnd::NO);
1584- cursorRegistered = true ;
1585- curChkSeqno = result.first ;
1586- } catch (std::exception& error) {
1587- log (EXTENSION_LOG_WARNING,
1588- " (vb %" PRIu16
1589- " ) Failed to register "
1590- " cursor: %s" ,
1591- vb_,
1592- error.what ());
1593- endStream (END_STREAM_STATE);
1594- }
1595- }
1596- if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
1539+ // backfill not needed
1540+ if (flags_ & static_cast <uint64_t >(DCP_ADD_STREAM_FLAG_DISKONLY)) {
15971541 endStream (END_STREAM_OK);
1598- } else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
1542+ } else if (flags_ &
1543+ static_cast <uint64_t >(DCP_ADD_STREAM_FLAG_TAKEOVER)) {
15991544 transitionState (StreamState::TakeoverSend);
16001545 } else {
16011546 transitionState (StreamState::InMemory);
0 commit comments