@@ -1366,14 +1366,28 @@ void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) {
13661366// At the end of the scan, all outstanding Prepared items (which did not
13671367// have a Commit persisted to disk) will be registered with the Durability
13681368// Monitor.
1369- void EPBucket::loadPreparedSyncWrites (
1369+ EPBucket::LoadPreparedSyncWritesResult EPBucket::loadPreparedSyncWrites (
13701370 folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) {
13711371 // / Disk load callback for scan.
13721372 struct LoadSyncWrites : public StatusCallback <GetValue> {
1373- LoadSyncWrites (EPVBucket& vb) : vb(vb) {
1373+ LoadSyncWrites (EPVBucket& vb, uint64_t highPreparedSeqno)
1374+ : vb(vb), highPreparedSeqno(highPreparedSeqno) {
13741375 }
13751376
13761377 void callback (GetValue& val) override {
1378+ // Abort the scan early if we have passed the HPS as we don't need
1379+ // to load any more prepares.
1380+ if (val.item ->getBySeqno () >
1381+ static_cast <int64_t >(highPreparedSeqno)) {
1382+ // ENOMEM may seem like an odd status code to abort the scan but
1383+ // disk backfill to a given seqno also returns ENGINE_ENOMEM
1384+ // when it has received all the seqnos that it cares about to
1385+ // abort the scan.
1386+ setStatus (ENGINE_ENOMEM);
1387+ return ;
1388+ }
1389+
1390+ itemsVisited++;
13771391 if (val.item ->isPending ()) {
13781392 // Pending item which was not aborted (deleted). Add to
13791393 // outstanding Prepare map.
@@ -1392,6 +1406,13 @@ void EPBucket::loadPreparedSyncWrites(
13921406
13931407 EPVBucket& vb;
13941408
1409+ // HPS after which we can abort the scan
1410+ uint64_t highPreparedSeqno = std::numeric_limits<uint64_t >::max();
1411+
1412+ // Number of items our callback "visits". Used to validate how many
1413+ // items we look at when loading SyncWrites.
1414+ uint64_t itemsVisited = 0 ;
1415+
13951416 // / Map of Document key -> outstanding (not yet Committed / Aborted)
13961417 // / prepares.
13971418 std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
@@ -1401,18 +1422,39 @@ void EPBucket::loadPreparedSyncWrites(
14011422 auto & epVb = dynamic_cast <EPVBucket&>(vb);
14021423 const auto start = std::chrono::steady_clock::now();
14031424
1404- // @TODO MB-34017: We can optimise this by starting the scan at the
1405- // high_committed_seqno - all earlier prepares would have been committed
1406- // (or were aborted) and only scanning up to the high prepared seqno.
1407- uint64_t startSeqno = 0 ;
1408-
14091425 // Get the kvStore. Using the RW store as the rollback code that will call
14101426 // this function will modify vbucket_state that will only be reflected in
14111427 // RW store. For warmup case, we don't allow writes at this point in time
14121428 // anyway.
14131429 auto * kvStore = getRWUnderlyingByShard(epVb.getShard()->getId ());
14141430
1415- auto storageCB = std::make_shared<LoadSyncWrites>(epVb);
1431+ // Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1432+ auto vbState = kvStore->getVBucketState (epVb.getId());
1433+ if (!vbState) {
1434+ throw std::logic_error (" EPBucket::loadPreparedSyncWrites: processing " +
1435+ epVb.getId ().to_string () +
1436+ " , but found no vbucket_state" );
1437+ }
1438+
1439+ // Insert all outstanding Prepares into the VBucket (HashTable &
1440+ // DurabilityMonitor).
1441+ std::vector<queued_item> prepares;
1442+ if (vbState->highPreparedSeqno == vbState->highCompletedSeqno) {
1443+ // We don't need to warm up anything for this vBucket as all of our
1444+ // prepares have been completed, but we do need to create the DM
1445+ // with our vbucket_state.
1446+ epVb.loadOutstandingPrepares (vbStateLh, *vbState, std::move (prepares));
1447+ // No prepares loaded
1448+ return {0 , 0 };
1449+ }
1450+
1451+ // We optimise this step by starting the scan at the seqno following the
1452+ // High Completed Seqno. By definition, all earlier prepares have been
1453+ // completed (Committed or Aborted).
1454+ const uint64_t startSeqno = vbState->highCompletedSeqno + 1 ;
1455+
1456+ auto storageCB =
1457+ std::make_shared<LoadSyncWrites>(epVb, vbState->highPreparedSeqno);
14161458
14171459 // Don't expect to find anything already in the HashTable, so use
14181460 // NoLookupCallback.
@@ -1434,11 +1476,17 @@ void EPBucket::loadPreparedSyncWrites(
14341476 EP_LOG_CRITICAL (
14351477 " EPBucket::loadPreparedSyncWrites: scanCtx is null for {}" ,
14361478 epVb.getId ());
1437- return ;
1479+ // No prepares loaded
1480+ return {0 , 0 };
14381481 }
14391482
14401483 auto scanResult = kvStore->scan (scanCtx);
1441- Expects (scanResult == scan_success);
1484+
1485+ // If we abort our scan early due to reaching the HPS then the scan result
1486+ // will be failure but we will have scanned correctly.
1487+ if (storageCB->getStatus () != ENGINE_ENOMEM) {
1488+ Expects (scanResult == scan_success);
1489+ }
14421490
14431491 kvStore->destroyScanContext (scanCtx);
14441492
@@ -1451,7 +1499,7 @@ void EPBucket::loadPreparedSyncWrites(
14511499
14521500 // Insert all outstanding Prepares into the VBucket (HashTable &
14531501 // DurabilityMonitor).
1454- std::vector<queued_item> prepares;
1502+ prepares.reserve(storageCB->outstandingPrepares.size()) ;
14551503 for (auto & prepare : storageCB->outstandingPrepares) {
14561504 prepares.emplace_back (std::move (prepare.second ));
14571505 }
@@ -1461,15 +1509,9 @@ void EPBucket::loadPreparedSyncWrites(
14611509 return a->getBySeqno () < b->getBySeqno ();
14621510 });
14631511
1464- // Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1465- auto vbState = kvStore->getVBucketState (epVb.getId ());
1466- if (!vbState) {
1467- throw std::logic_error (" EPBucket::loadPreparedSyncWrites: processing " +
1468- epVb.getId ().to_string () +
1469- " , but found no vbucket_state" );
1470- }
1471-
1512+ auto numPrepares = prepares.size();
14721513 epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares));
1514+ return {storageCB->itemsVisited , numPrepares};
14731515}
14741516
14751517ValueFilter EPBucket::getValueFilterForCompressionMode () {
0 commit comments