@@ -1504,8 +1504,7 @@ ENGINE_ERROR_CODE VBucket::set(
15041504 // that requires the item's info
15051505 if ((v == nullptr || v->isTempInitialItem ()) &&
15061506 (eviction == EvictionPolicy::Full) &&
1507- ((itm.getCas () != 0 ) ||
1508- storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
1507+ (cas_op || storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
15091508 // Check Bloomfilter's prediction
15101509 if (!maybeKeyExistsInFilter (itm.getKey ())) {
15111510 maybeKeyExists = false ;
@@ -3024,53 +3023,38 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
30243023 const VBQueueItemCtx& queueItmCtx,
30253024 cb::StoreIfStatus storeIfStatus,
30263025 bool maybeKeyExists) {
3027- if (v) {
3028- if (v->isPending ()) {
3029- // It is not valid for an active vBucket to attempt to overwrite an
3030- // in flight SyncWrite. If this vBucket is not active, we are
3031- // allowed to overwrite an in flight SyncWrite iff we are receiving
3032- // a disk snapshot. This is due to disk based de-dupe that allows
3033- // only 1 value per key. In this case, the active node may send a
3034- // mutation instead of a commit if it knows that this replica may be
3035- // missing a prepare. This code allows this mutation to be accepted
3036- // and overwrites the existing prepare.
3037- if (getState () == vbucket_state_active ||
3038- !isReceivingDiskSnapshot ()) {
3039- return {MutationStatus::IsPendingSyncWrite, {}};
3040- }
3026+ if (v && v->isPending ()) {
3027+ // It is not valid for an active vBucket to attempt to overwrite an
3028+ // in flight SyncWrite. If this vBucket is not active, we are
3029+ // allowed to overwrite an in flight SyncWrite iff we are receiving
3030+ // a disk snapshot. This is due to disk based de-dupe that allows
3031+ // only 1 value per key. In this case, the active node may send a
3032+ // mutation instead of a commit if it knows that this replica may be
3033+ // missing a prepare. This code allows this mutation to be accepted
3034+ // and overwrites the existing prepare.
3035+ if (getState () == vbucket_state_active || !isReceivingDiskSnapshot ()) {
3036+ return {MutationStatus::IsPendingSyncWrite, {}};
3037+ }
30413038
3042- Expects (itm.isCommitted ());
3043- getPassiveDM ().completeSyncWrite (
3044- itm.getKey (),
3045- PassiveDurabilityMonitor::Resolution::Commit,
3046- v->getBySeqno () /* prepareSeqno */ );
3039+ Expects (itm.isCommitted ());
3040+ getPassiveDM ().completeSyncWrite (
3041+ itm.getKey (),
3042+ PassiveDurabilityMonitor::Resolution::Commit,
3043+ v->getBySeqno () /* prepareSeqno */ );
30473044
3048- // Deal with the already existing prepare
3049- processImplicitlyCompletedPrepare (htRes.pending );
3045+ // Deal with the already existing prepare
3046+ processImplicitlyCompletedPrepare (htRes.pending );
30503047
3051- // Add a new or overwrite the existing mutation
3052- return processSetInner (htRes.pending .getHBL (),
3053- htRes.committed ,
3054- itm,
3055- cas,
3056- allowExisting,
3057- hasMetaData,
3058- queueItmCtx,
3059- storeIfStatus,
3060- maybeKeyExists);
3061- }
3062-
3063- // This is a new SyncWrite, we just want to add a new prepare unless we
3064- // still have a completed prepare (Ephemeral) which we should replace
3065- // instead.
3066- if (!v->isCompleted () && itm.isPending ()) {
3067- VBNotifyCtx notifyCtx;
3068- // We have to modify the StoredValue pointer passed in or we do not
3069- // return the correct cas to the client.
3070- std::tie (v, notifyCtx) = addNewStoredValue (
3071- htRes.getHBL (), itm, queueItmCtx, GenerateRevSeqno::No);
3072- return {MutationStatus::WasClean, notifyCtx};
3073- }
3048+ // Add a new or overwrite the existing mutation
3049+ return processSetInner (htRes.pending .getHBL (),
3050+ htRes.committed ,
3051+ itm,
3052+ cas,
3053+ allowExisting,
3054+ hasMetaData,
3055+ queueItmCtx,
3056+ storeIfStatus,
3057+ maybeKeyExists);
30743058 }
30753059
30763060 return processSetInner (htRes.getHBL (),
@@ -3183,9 +3167,21 @@ VBucket::processSetInner(const HashTable::HashBucketLock& hbl,
31833167
31843168 MutationStatus status;
31853169 VBNotifyCtx notifyCtx;
3186- std::tie (v, status, notifyCtx) =
3187- updateStoredValue (hbl, *v, itm, queueItmCtx);
3170+ // This is a new SyncWrite, we just want to add a new prepare unless we
3171+ // still have a completed prepare (Ephemeral) which we should replace
3172+ // instead.
3173+ if (v->isCommitted () && !v->isCompleted () && itm.isPending ()) {
3174+ std::tie (v, notifyCtx) = addNewStoredValue (
3175+ hbl, itm, queueItmCtx, GenerateRevSeqno::No);
3176+ // Add should always be clean
3177+ status = MutationStatus::WasClean;
3178+ } else {
3179+ std::tie (v, status, notifyCtx) =
3180+ updateStoredValue (hbl, *v, itm, queueItmCtx);
3181+ }
3182+
31883183 return {status, notifyCtx};
3184+
31893185 } else if (cas != 0 ) {
31903186 return {MutationStatus::NotFound, {}};
31913187 } else {
0 commit comments