@@ -76,13 +76,6 @@ void logWarmupStats(EPBucket& epstore) {
7676 megabytes_per_seconds);
7777}
7878
79- /* *
80- * Returns the ValueFilter to use for KVStore scans, given the bucket
81- * compression mode.
82- */
83- static ValueFilter getValueFilterForCompressionMode (
84- const BucketCompressionMode& compressionMode);
85-
8679// ////////////////////////////////////////////////////////////////////////////
8780// //
8881// Helper class used to insert data into the epstore //
@@ -1220,119 +1213,17 @@ void Warmup::scheduleLoadPreparedSyncWrites() {
12201213}
12211214
12221215void Warmup::loadPreparedSyncWrites (uint16_t shardId) {
1223- // Perform an in-order scan of the seqno index.
1224- // a) For each Prepared item found, add to a map of outstanding Prepares.
1225- // b) For each Committed (via Mutation or Prepare) item, if there's an
1226- // outstanding Prepare then that prepare has already been Committed,
1227- // hence remove it from the map.
1228- //
1229- // At the end of the scan, all outstanding Prepared items (which did not
1230- // have a Commit persisted to disk) will be registered with the Durability
1231- // Monitor.
1232-
1233- // / Disk load callback for scan.
1234- struct LoadSyncWrites : public StatusCallback <GetValue> {
1235- LoadSyncWrites (EPVBucket& vb) : vb(vb) {
1236- }
1237-
1238- void callback (GetValue& val) override {
1239- if (val.item ->isPending ()) {
1240- // Pending item which was not aborted (deleted). Add to
1241- // outstanding Prepare map.
1242- outstandingPrepares.emplace (val.item ->getKey (),
1243- std::move (val.item ));
1244- return ;
1245- }
1246-
1247- if (val.item ->isCommitted ()) {
1248- // Committed item. _If_ there's an outstanding prepared
1249- // SyncWrite, remove it (as it has already been committed).
1250- outstandingPrepares.erase (val.item ->getKey ());
1251- return ;
1252- }
1253- }
1254-
1255- EPVBucket& vb;
1256-
1257- // / Map of Document key -> outstanding (not yet Committed / Aborted)
1258- // / prepares.
1259- std::unordered_map<StoredDocKey, std::unique_ptr<Item>>
1260- outstandingPrepares;
1261- };
1262-
12631216 for (const auto vbid : shardVbIds[shardId]) {
1264- const auto start = std::chrono::steady_clock::now ();
12651217 auto itr = warmedUpVbuckets.find (vbid.get ());
12661218 if (itr == warmedUpVbuckets.end ()) {
12671219 continue ;
12681220 }
1269- auto & epVb = dynamic_cast <EPVBucket&>(*(itr->second ));
1270-
1271- auto storageCB = std::make_shared<LoadSyncWrites>(epVb);
1272-
1273- // Don't expect to find anything already in the HashTable, so use
1274- // NoLookupCallback.
1275- auto cacheCB = std::make_shared<NoLookupCallback>();
1276-
1277- // @todo-durability: We can optimise this by starting the scan at the
1278- // high_committed_seqno - all earlier prepares would have been committed
1279- // (or were aborted).
1280- uint64_t startSeqno = 0 ;
1281-
1282- auto * kvStore = store.getROUnderlyingByShard (shardId);
1283- // Use ALL_ITEMS filter for the scan. NO_DELETES is insufficient
1284- // because (committed) SyncDeletes manifest as a prepared_sync_write
1285- // (doc on disk not deleted) followed by a commit_sync_write (which
1286- // *is* marked as deleted as that's the resulting state).
1287- // We need to see that Commit, hence ALL_ITEMS.
1288- const auto docFilter = DocumentFilter::ALL_ITEMS;
1289- const auto valFilter = getValueFilterForCompressionMode (
1290- store.getEPEngine ().getCompressionMode ());
1291- auto * scanCtx = kvStore->initScanContext (
1292- storageCB, cacheCB, vbid, startSeqno, docFilter, valFilter);
1293-
1294- // storage problems can lead to a null context, kvstore logs details
1295- if (!scanCtx) {
1296- EP_LOG_CRITICAL (
1297- " Warmup::loadPreparedSyncWrites: scanCtx is null for {}" , vbid);
1298- continue ;
1299- }
1300-
1301- auto scanResult = kvStore->scan (scanCtx);
1302- Expects (scanResult == scan_success);
1303-
1304- kvStore->destroyScanContext (scanCtx);
1305-
1306- EP_LOG_DEBUG (
1307- " Warmup::loadPreparedSyncWrites: Identified {} outstanding "
1308- " prepared SyncWrites for {} in {}" ,
1309- storageCB->outstandingPrepares .size (),
1310- vbid,
1311- cb::time2text (std::chrono::steady_clock::now () - start));
1312-
1313- // Insert all outstanding Prepares into the VBucket (HashTable &
1314- // DurabilityMonitor).
1315- std::vector<queued_item> prepares;
1316- for (auto & prepare : storageCB->outstandingPrepares ) {
1317- prepares.emplace_back (std::move (prepare.second ));
1318- }
1319- // Sequence must be sorted by seqno (ascending) for DurabilityMonitor.
1320- std::sort (prepares.begin (),
1321- prepares.end (),
1322- [](const auto & a, const auto & b) {
1323- return a->getBySeqno () < b->getBySeqno ();
1324- });
1325-
1326- // Need the HPS/HCS so the DurabilityMonitor can be fully resumed
1327- auto vbState = shardVbStates[shardId].find (vbid);
1328- if (vbState == shardVbStates[shardId].end ()) {
1329- throw std::logic_error (
1330- " Warmup::loadPreparedSyncWrites: processing " +
1331- vbid.to_string () + " , but found no vbucket_state" );
1332- }
1333- const vbucket_state& vbs = vbState->second ;
13341221
1335- epVb.restoreOutstandingPreparesFromWarmup (vbs, std::move (prepares));
1222+ // Our EPBucket function will do the load for us as we re-use the code
1223+ // for rollback.
1224+ auto & vb = *(itr->second );
1225+ folly::SharedMutex::WriteHolder vbStateLh (vb.getStateLock ());
1226+ store.loadPreparedSyncWrites (vbStateLh, vb);
13361227 }
13371228
13381229 if (++threadtask_count == store.vbMap .getNumShards ()) {
@@ -1572,16 +1463,6 @@ void Warmup::scheduleLoadingKVPairs()
15721463
15731464}
15741465
1575- ValueFilter getValueFilterForCompressionMode (
1576- const BucketCompressionMode& compressionMode) {
1577-
1578- if (compressionMode != BucketCompressionMode::Off) {
1579- return ValueFilter::VALUES_COMPRESSED;
1580- }
1581-
1582- return ValueFilter::VALUES_DECOMPRESSED;
1583- }
1584-
15851466void Warmup::loadKVPairsforShard (uint16_t shardId)
15861467{
15871468 bool maybe_enable_traffic = false ;
@@ -1597,8 +1478,7 @@ void Warmup::loadKVPairsforShard(uint16_t shardId)
15971478 auto cl =
15981479 std::make_shared<LoadValueCallback>(store.vbMap , state.getState ());
15991480
1600- ValueFilter valFilter = getValueFilterForCompressionMode (
1601- store.getEPEngine ().getCompressionMode ());
1481+ ValueFilter valFilter = store.getValueFilterForCompressionMode ();
16021482
16031483 for (const auto vbid : shardVbIds[shardId]) {
16041484 ScanContext* ctx = kvstore->initScanContext (cb, cl, vbid, 0 ,
@@ -1640,8 +1520,7 @@ void Warmup::loadDataforShard(uint16_t shardId)
16401520 auto cl =
16411521 std::make_shared<LoadValueCallback>(store.vbMap , state.getState ());
16421522
1643- ValueFilter valFilter = getValueFilterForCompressionMode (
1644- store.getEPEngine ().getCompressionMode ());
1523+ ValueFilter valFilter = store.getValueFilterForCompressionMode ();
16451524
16461525 for (const auto vbid : shardVbIds[shardId]) {
16471526 ScanContext* ctx = kvstore->initScanContext (cb, cl, vbid, 0 ,
0 commit comments