Skip to content

Commit d72f776

Browse files
daverigbypaolococchi
authored andcommitted
MB-33186 [SR]: Use bySeqno scan for warmup of Prepared SyncWrites
Modify the algorithm used by Warmup::loadPreparedSyncWrites() to load any in-flight (not yet committed / or aborted) SyncWrites from disk. +Background+ Initially the approach chosen was to identify these prepared SyncWrites by performing a range scan on the byKey index, looking for all mutations in the DurabilityPrepare namespace. This however relies on deleting Prepared SyncWrites from disk whenever they are Committed or Aborted. This is undesirable as it requires an additional item to be flushed every time we perform a commit (the delete of the prepared item). As such we do not delete Prepared items when Committed (only when Aborted as that's necessary to mark they should no longer be considered outstanding). However, this means that Warmup cannot rely on the (not-deleted) Prepared Sync Writes found on disk still being in-flight - they could have already been Committed. +Solution+ Instead of performing a byKey scan, perform an in-order scan of the seqno index: a) For each Prepared item found, add to the Durability Monitor and HashTable. b) For each Committed (via Mutation or Prepare) item, if a prepared item exists in the HashTable (i.e. was added at step (a)) then mark it as Committed in the DurabilityMonitor. At the end of the scan, all in-flight Prepared items (which did not have a Commit persisted to disk) will be registered with the Durability Monitor. Note: The above is functionally correct, however it it potentially inefficient - we must scan the complete bySeqno tree to identify all in-flight SyncWrites. A later patch will introduce an optimzation where we can use the high_committed_seqno to contrain the range of seqnos scanned to the end of the tree. Change-Id: I799eb3e6120716ddb979aba90e6cef3f82ada50e Reviewed-on: http://review.couchbase.org/108010 Tested-by: Build Bot <[email protected]> Reviewed-by: Paolo Cocchi <[email protected]>
1 parent 8bea749 commit d72f776

File tree

1 file changed

+81
-26
lines changed

1 file changed

+81
-26
lines changed

engines/ep/src/warmup.cc

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ void logWarmupStats(EPBucket& epstore) {
7575
megabytes_per_seconds);
7676
}
7777

78+
/**
79+
* Returns the ValueFilter to use for KVStore scans, given the bucket
80+
* compression mode.
81+
*/
82+
static ValueFilter getValueFilterForCompressionMode(
83+
const BucketCompressionMode& compressionMode);
84+
7885
//////////////////////////////////////////////////////////////////////////////
7986
// //
8087
// Helper class used to insert data into the epstore //
@@ -1109,36 +1116,84 @@ void Warmup::scheduleLoadPreparedSyncWrites() {
11091116
}
11101117

11111118
void Warmup::loadPreparedSyncWrites(uint16_t shardId) {
1112-
// Perform a range lookup of just prepared namespace
1113-
// For each item found, load into HashTable
1114-
for (const auto vbid : shardVbIds[shardId]) {
1115-
// Need to form start and end keys which encompass _all_ possible
1116-
// keys under the DurabilityPrepare namespace.
1117-
// Start: DurabilityPrepare=true, lowest possible collectionID,
1118-
// zero length key.
1119-
// 0x02 0x00
1120-
auto startKey =
1121-
DiskDocKey{StoredDocKey{"", CollectionID::Default}, true};
1122-
// End: DurabilityPrepare=false, CollectionID following
1123-
// DurabilityPrepare, zero length key:
1124-
// 0x03 0x00
1125-
auto endCid = CollectionID{CollectionID::Reserved3,
1126-
CollectionID::SkipIDVerificationTag{}};
1127-
auto endKey = DiskDocKey{StoredDocKey{"", endCid}, false};
1119+
// Perform an in-order scan of the seqno index.
1120+
// a) For each Prepared item found, add to the Durability Monitor and
1121+
// HashTable.
1122+
// b) For each Committed (via Mutation or Prepare) item, if a prepared
1123+
// item exists in the HashTable (i.e. was added at step (a)) then mark
1124+
// it as Committed in the DurabilityMonitor.
1125+
//
1126+
// At the end of the scan, all in-flight Prepared items (which did not
1127+
// have a Commit persisted to disk) will be registered with the Durability
1128+
// Monitor.
1129+
1130+
/// Disk load callback for scan.
1131+
struct LoadSyncWrites : public StatusCallback<GetValue> {
1132+
LoadSyncWrites(EPVBucket& vb) : vb(vb) {
1133+
}
1134+
1135+
void callback(GetValue& val) override {
1136+
// Should only be called back for non-deleted items.
1137+
Expects(!val.item->isDeleted());
1138+
1139+
if (val.item->isPending()) {
1140+
// Pending item which was not aborted (deleted). Load into
1141+
// VBucket.
1142+
queued_item qi(val.item.release());
1143+
const auto res = vb.insertFromWarmup(*qi,
1144+
/*shouldEject*/ false,
1145+
/*keyMetaDataOnly*/ false);
1146+
Expects(res == MutationStatus::NotFound);
1147+
return;
1148+
}
1149+
1150+
if (val.item->isCommitted()) {
1151+
// Committed item. If there's an outstanding prepared
1152+
// SyncWrite, commit it.
1153+
auto handle = vb.lockCollections(val.item->getKey());
1154+
auto prepared = vb.fetchPreparedValue(handle);
1155+
if (!prepared.storedValue) {
1156+
return;
1157+
}
1158+
1159+
vb.ht.commit(prepared.lock, *prepared.storedValue);
1160+
}
1161+
}
1162+
1163+
EPVBucket& vb;
1164+
};
11281165

1166+
for (const auto vbid : shardVbIds[shardId]) {
11291167
auto vb = store.getVBucket(vbid);
11301168
auto& epVb = dynamic_cast<EPVBucket&>(*vb);
11311169

1132-
store.getROUnderlyingByShard(shardId)->getRange(
1133-
vbid, startKey, endKey, [&epVb](GetValue&& gv) {
1134-
EP_LOG_DEBUG("Warmup::loadPreparedSyncWrites: {}",
1135-
*gv.item);
1136-
const auto res =
1137-
epVb.insertFromWarmup(*gv.item,
1138-
/*shouldEject*/ false,
1139-
gv.isPartial());
1140-
Expects(res == MutationStatus::NotFound);
1141-
});
1170+
auto storageCB = std::make_shared<LoadSyncWrites>(epVb);
1171+
1172+
// Don't expect to find anything already in the HashTable, so use
1173+
// NoLookupCallback.
1174+
auto cacheCB = std::make_shared<NoLookupCallback>();
1175+
1176+
// @todo-durability: We can optimise this by starting the scan at the
1177+
// high_committed_seqno - all earlier prepares would have been committed
1178+
// (or were aborted).
1179+
uint64_t startSeqno = 0;
1180+
1181+
auto valFilter = getValueFilterForCompressionMode(
1182+
store.getEPEngine().getCompressionMode());
1183+
1184+
auto* kvStore = store.getROUnderlyingByShard(shardId);
1185+
auto* scanCtx = kvStore->initScanContext(storageCB,
1186+
cacheCB,
1187+
vbid,
1188+
startSeqno,
1189+
DocumentFilter::NO_DELETES,
1190+
valFilter);
1191+
Expects(scanCtx);
1192+
1193+
auto scanResult = kvStore->scan(scanCtx);
1194+
Expects(scanResult == scan_success);
1195+
1196+
kvStore->destroyScanContext(scanCtx);
11421197
}
11431198

11441199
if (++threadtask_count == store.vbMap.getNumShards()) {

0 commit comments

Comments
 (0)