@@ -1206,8 +1206,20 @@ class EPDiskRollbackCB : public RollbackCB {
12061206 // This is the item in its current state, after the rollback seqno
12071207 // (i.e. the state that we are reverting)
12081208 UniqueItemPtr postRbSeqnoItem (std::move (val.item ));
1209+
12091210 VBucketPtr vb = engine.getVBucket (postRbSeqnoItem->getVBucketId ());
12101211
1212+ // Nuke anything in the prepare namespace, we'll do a "warmup" later
1213+ // which will restore everything to the way it should be and this is
1214+ // far easier than dealing with individual states.
1215+ if (postRbSeqnoItem->isPending () || postRbSeqnoItem->isAbort ()) {
1216+ removeDeletedDoc (*vb, *postRbSeqnoItem);
1217+ return ;
1218+ }
1219+
1220+ EP_LOG_DEBUG (" EPDiskRollbackCB: Handling post rollback item: {}" ,
1221+ *postRbSeqnoItem);
1222+
12111223 // The get value of the item before the rollback seqno
12121224 GetValue preRbSeqnoGetValue =
12131225 engine.getKVBucket ()
@@ -1216,10 +1228,18 @@ class EPDiskRollbackCB : public RollbackCB {
12161228 DiskDocKey{*postRbSeqnoItem},
12171229 postRbSeqnoItem->getVBucketId (),
12181230 GetMetaOnly::No);
1231+
1232+ // This is the item in the state it was before the rollback seqno
1233+ // (i.e. the desired state). null if there was no previous
1234+ // Item.
1235+ UniqueItemPtr preRbSeqnoItem (std::move (preRbSeqnoGetValue.item ));
1236+
12191237 if (preRbSeqnoGetValue.getStatus () == ENGINE_SUCCESS) {
1220- // This is the item in the state it was before the rollback seqno
1221- // (i.e. the desired state)
1222- UniqueItemPtr preRbSeqnoItem (std::move (preRbSeqnoGetValue.item ));
1238+ EP_LOG_DEBUG (
1239+ " EPDiskRollbackCB: Item existed pre-rollback; restoring to "
1240+ " pre-rollback state: {}" ,
1241+ *preRbSeqnoItem);
1242+
12231243 if (preRbSeqnoItem->isDeleted ()) {
12241244 // If the item existed before, but had been deleted, we
12251245 // should delete it now
@@ -1228,9 +1248,33 @@ class EPDiskRollbackCB : public RollbackCB {
12281248 // The item existed before and was not deleted, we need to
12291249 // revert the items state to the preRollbackSeqno state
12301250 MutationStatus mtype = vb->setFromInternal (*preRbSeqnoItem);
1231-
1232- if (mtype == MutationStatus::NoMem) {
1251+ switch (mtype) {
1252+ case MutationStatus::NotFound:
1253+ // NotFound is valid - if the item has been deleted
1254+ // in-memory, but that was not flushed to disk as of
1255+ // post-rollback seqno.
1256+ break ;
1257+ case MutationStatus::WasClean:
1258+ // Item hasn't been modified since it was persisted to disk
1259+ // as of post-rollback seqno.
1260+ break ;
1261+ case MutationStatus::WasDirty:
1262+ // Item was modifed since it was persisted to disk - this
1263+ // is ok because it's just a mutation which has not yet
1264+ // been persisted to disk as of post-rollback seqno.
1265+ break ;
1266+ case MutationStatus::NoMem:
12331267 setStatus (ENGINE_ENOMEM);
1268+ break ;
1269+ case MutationStatus::InvalidCas:
1270+ case MutationStatus::IsLocked:
1271+ case MutationStatus::NeedBgFetch:
1272+ case MutationStatus::IsPendingSyncWrite:
1273+ std::stringstream ss;
1274+ ss << " EPDiskRollbackCB: Unexpected status:"
1275+ << to_string (mtype)
1276+ << " after setFromInternal for item:" << *preRbSeqnoItem;
1277+ throw std::logic_error (ss.str ());
12341278 }
12351279
12361280 // If we are rolling back a deletion then we should increment
@@ -1244,50 +1288,17 @@ class EPDiskRollbackCB : public RollbackCB {
12441288 .lock (preRbSeqnoItem->getKey ())
12451289 .incrementDiskCount ();
12461290 }
1247-
1248- if (postRbSeqnoItem->isCommitSyncWrite () ||
1249- postRbSeqnoItem->isAbort ()) {
1250- rollbackSyncWrite (*vb, *postRbSeqnoItem);
1251- }
12521291 }
12531292 } else if (preRbSeqnoGetValue.getStatus () == ENGINE_KEY_ENOENT) {
12541293 // If the item did not exist before we should delete it now
12551294 removeDeletedDoc (*vb, *postRbSeqnoItem);
1256-
1257- // If we are rolling back a commit that did not exist on disk
1258- // before then we need to requeue a prepare
1259- if (postRbSeqnoItem->isCommitSyncWrite ()) {
1260- rollbackSyncWrite (*vb, *postRbSeqnoItem);
1261- }
12621295 } else {
12631296 EP_LOG_WARN (
12641297 " EPDiskRollbackCB::callback:Unexpected Error Status: {}" ,
12651298 preRbSeqnoGetValue.getStatus ());
12661299 }
12671300 }
12681301
1269- void rollbackSyncWrite (VBucket& vb, const Item& postRbSeqnoItem) {
1270- // Search for prepare in the pre-rollback header
1271- GetValue preRbSeqnoPrepare =
1272- engine.getKVBucket ()
1273- ->getROUnderlying (postRbSeqnoItem.getVBucketId ())
1274- ->getWithHeader (dbHandle,
1275- DiskDocKey (postRbSeqnoItem.getKey (),
1276- true /* prepare*/ ),
1277- postRbSeqnoItem.getVBucketId (),
1278- GetMetaOnly::No);
1279- if (preRbSeqnoPrepare.getStatus () == ENGINE_SUCCESS &&
1280- !preRbSeqnoPrepare.item ->isDeleted ()) {
1281- // Exists before so we are rolling back to prepare
1282- vb.addSyncWriteForRollback (*preRbSeqnoPrepare.item );
1283-
1284- // Store the prepare we need to add back in to the Passive DM for
1285- // post-rollback processing.
1286- queued_item qi (std::make_unique<Item>(*preRbSeqnoPrepare.item ));
1287- preparesToAdd.push_back (qi);
1288- }
1289- }
1290-
12911302 // / Remove a deleted-on-disk document from the VBucket's hashtable.
12921303 void removeDeletedDoc (VBucket& vb, const Item& item) {
12931304 if (vb.removeItemFromMemory (item)) {
@@ -1318,7 +1329,6 @@ RollbackResult EPBucket::doRollback(Vbid vbid, uint64_t rollbackSeqno) {
13181329 auto cb = std::make_shared<EPDiskRollbackCB>(engine);
13191330 KVStore* rwUnderlying = vbMap.getShardByVbId (vbid)->getRWUnderlying ();
13201331 auto result = rwUnderlying->rollback (vbid, rollbackSeqno, cb);
1321- result.preparesToAdd = cb->preparesToAdd ;
13221332 return result;
13231333}
13241334
0 commit comments