Skip to content

Commit 6f23cd8

Browse files
committed
MB-35539: Check cas of committed item when creating new prepare
Transactions identified that we were not checking the cas of the committed item when attempting to add a new prepare. This was causing us to lose subdoc operations. Refactor the code to hit all of the normal checks made in VBucket::processSetInner when attempting to add a new prepare. Change-Id: I9cd19b425180c910b7ae84085fad5aee149ee71f Reviewed-on: http://review.couchbase.org/113559 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 9a5827d commit 6f23cd8

File tree

3 files changed

+78
-58
lines changed

3 files changed

+78
-58
lines changed

engines/ep/src/vbucket.cc

Lines changed: 44 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,8 +1504,7 @@ ENGINE_ERROR_CODE VBucket::set(
15041504
// that requires the item's info
15051505
if ((v == nullptr || v->isTempInitialItem()) &&
15061506
(eviction == EvictionPolicy::Full) &&
1507-
((itm.getCas() != 0) ||
1508-
storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
1507+
(cas_op || storeIfStatus == cb::StoreIfStatus::GetItemInfo)) {
15091508
// Check Bloomfilter's prediction
15101509
if (!maybeKeyExistsInFilter(itm.getKey())) {
15111510
maybeKeyExists = false;
@@ -3024,53 +3023,38 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
30243023
const VBQueueItemCtx& queueItmCtx,
30253024
cb::StoreIfStatus storeIfStatus,
30263025
bool maybeKeyExists) {
3027-
if (v) {
3028-
if (v->isPending()) {
3029-
// It is not valid for an active vBucket to attempt to overwrite an
3030-
// in flight SyncWrite. If this vBucket is not active, we are
3031-
// allowed to overwrite an in flight SyncWrite iff we are receiving
3032-
// a disk snapshot. This is due to disk based de-dupe that allows
3033-
// only 1 value per key. In this case, the active node may send a
3034-
// mutation instead of a commit if it knows that this replica may be
3035-
// missing a prepare. This code allows this mutation to be accepted
3036-
// and overwrites the existing prepare.
3037-
if (getState() == vbucket_state_active ||
3038-
!isReceivingDiskSnapshot()) {
3039-
return {MutationStatus::IsPendingSyncWrite, {}};
3040-
}
3026+
if (v && v->isPending()) {
3027+
// It is not valid for an active vBucket to attempt to overwrite an
3028+
// in flight SyncWrite. If this vBucket is not active, we are
3029+
// allowed to overwrite an in flight SyncWrite iff we are receiving
3030+
// a disk snapshot. This is due to disk based de-dupe that allows
3031+
// only 1 value per key. In this case, the active node may send a
3032+
// mutation instead of a commit if it knows that this replica may be
3033+
// missing a prepare. This code allows this mutation to be accepted
3034+
// and overwrites the existing prepare.
3035+
if (getState() == vbucket_state_active || !isReceivingDiskSnapshot()) {
3036+
return {MutationStatus::IsPendingSyncWrite, {}};
3037+
}
30413038

3042-
Expects(itm.isCommitted());
3043-
getPassiveDM().completeSyncWrite(
3044-
itm.getKey(),
3045-
PassiveDurabilityMonitor::Resolution::Commit,
3046-
v->getBySeqno() /* prepareSeqno */);
3039+
Expects(itm.isCommitted());
3040+
getPassiveDM().completeSyncWrite(
3041+
itm.getKey(),
3042+
PassiveDurabilityMonitor::Resolution::Commit,
3043+
v->getBySeqno() /* prepareSeqno */);
30473044

3048-
// Deal with the already existing prepare
3049-
processImplicitlyCompletedPrepare(htRes.pending);
3045+
// Deal with the already existing prepare
3046+
processImplicitlyCompletedPrepare(htRes.pending);
30503047

3051-
// Add a new or overwrite the existing mutation
3052-
return processSetInner(htRes.pending.getHBL(),
3053-
htRes.committed,
3054-
itm,
3055-
cas,
3056-
allowExisting,
3057-
hasMetaData,
3058-
queueItmCtx,
3059-
storeIfStatus,
3060-
maybeKeyExists);
3061-
}
3062-
3063-
// This is a new SyncWrite, we just want to add a new prepare unless we
3064-
// still have a completed prepare (Ephemeral) which we should replace
3065-
// instead.
3066-
if (!v->isCompleted() && itm.isPending()) {
3067-
VBNotifyCtx notifyCtx;
3068-
// We have to modify the StoredValue pointer passed in or we do not
3069-
// return the correct cas to the client.
3070-
std::tie(v, notifyCtx) = addNewStoredValue(
3071-
htRes.getHBL(), itm, queueItmCtx, GenerateRevSeqno::No);
3072-
return {MutationStatus::WasClean, notifyCtx};
3073-
}
3048+
// Add a new or overwrite the existing mutation
3049+
return processSetInner(htRes.pending.getHBL(),
3050+
htRes.committed,
3051+
itm,
3052+
cas,
3053+
allowExisting,
3054+
hasMetaData,
3055+
queueItmCtx,
3056+
storeIfStatus,
3057+
maybeKeyExists);
30743058
}
30753059

30763060
return processSetInner(htRes.getHBL(),
@@ -3183,9 +3167,21 @@ VBucket::processSetInner(const HashTable::HashBucketLock& hbl,
31833167

31843168
MutationStatus status;
31853169
VBNotifyCtx notifyCtx;
3186-
std::tie(v, status, notifyCtx) =
3187-
updateStoredValue(hbl, *v, itm, queueItmCtx);
3170+
// This is a new SyncWrite, we just want to add a new prepare unless we
3171+
// still have a completed prepare (Ephemeral) which we should replace
3172+
// instead.
3173+
if (v->isCommitted() && !v->isCompleted() && itm.isPending()) {
3174+
std::tie(v, notifyCtx) = addNewStoredValue(
3175+
hbl, itm, queueItmCtx, GenerateRevSeqno::No);
3176+
// Add should always be clean
3177+
status = MutationStatus::WasClean;
3178+
} else {
3179+
std::tie(v, status, notifyCtx) =
3180+
updateStoredValue(hbl, *v, itm, queueItmCtx);
3181+
}
3182+
31883183
return {status, notifyCtx};
3184+
31893185
} else if (cas != 0) {
31903186
return {MutationStatus::NotFound, {}};
31913187
} else {

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -770,15 +770,15 @@ TEST_P(DurabilityActiveStreamTest,
770770
auto vb = engine->getVBucket(vbid);
771771
const auto key = makeStoredDocKey("key");
772772
const std::string value = "value";
773-
auto item = makePendingItem(
774-
key,
775-
value,
776-
cb::durability::Requirements(cb::durability::Level::Majority,
777-
1 /*timeout*/));
778-
VBQueueItemCtx ctx;
779-
ctx.durability =
780-
DurabilityItemCtx{item->getDurabilityReqs(), nullptr /*cookie*/};
781-
{
773+
{ // Locking scope for collections handle
774+
auto item = makePendingItem(
775+
key,
776+
value,
777+
cb::durability::Requirements(cb::durability::Level::Majority,
778+
1 /*timeout*/));
779+
VBQueueItemCtx ctx;
780+
ctx.durability = DurabilityItemCtx{item->getDurabilityReqs(),
781+
nullptr /*cookie*/};
782782
auto cHandle = vb->lockCollections(item->getKey());
783783
EXPECT_EQ(ENGINE_EWOULDBLOCK,
784784
vb->set(*item, cookie, *engine, {}, cHandle));
@@ -794,7 +794,15 @@ TEST_P(DurabilityActiveStreamTest,
794794
EXPECT_EQ(1, vb->getHighPreparedSeqno());
795795
EXPECT_EQ(1, vb->getHighCompletedSeqno());
796796

797-
{
797+
{ // Locking scope for collections handle
798+
auto item = makePendingItem(
799+
key,
800+
value,
801+
cb::durability::Requirements(cb::durability::Level::Majority,
802+
1 /*timeout*/));
803+
VBQueueItemCtx ctx;
804+
ctx.durability = DurabilityItemCtx{item->getDurabilityReqs(),
805+
nullptr /*cookie*/};
798806
auto cHandle = vb->lockCollections(item->getKey());
799807
EXPECT_EQ(ENGINE_EWOULDBLOCK,
800808
vb->set(*item, cookie, *engine, {}, cHandle));

engines/ep/tests/module_tests/evp_store_durability_test.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,6 +2055,22 @@ TEST_P(DurabilityBucketTest, ActiveToReplicaAndCommit) {
20552055
ASSERT_EQ(ENGINE_SUCCESS, vb.commit(key, 1, 4, vb.lockCollections(key)));
20562056
}
20572057

2058+
TEST_P(DurabilityBucketTest, CasCheckMadeForNewPrepare) {
2059+
setVBucketToActiveWithValidTopology();
2060+
2061+
auto key = makeStoredDocKey("key");
2062+
auto committed = makeCommittedItem(key, "committed");
2063+
2064+
ASSERT_EQ(ENGINE_SUCCESS, store->set(*committed, cookie));
2065+
2066+
auto pending = makePendingItem(key, "pending");
2067+
pending->setCas(123);
2068+
EXPECT_EQ(ENGINE_KEY_EEXISTS, store->set(*pending, cookie));
2069+
2070+
pending->setCas(committed->getCas());
2071+
EXPECT_EQ(ENGINE_EWOULDBLOCK, store->set(*pending, cookie));
2072+
}
2073+
20582074
// Test cases which run against couchstore
20592075
INSTANTIATE_TEST_CASE_P(AllBackends,
20602076
DurabilityCouchstoreBucketTest,

0 commit comments

Comments
 (0)