Skip to content

Commit ee83c61

Browse files
jameseh96daverigby
authored andcommitted
MB-35954: Ensure durable sets with cas compare to the committed cas
When storing with cas, currently the cas is compared against the cas of the stored value which is to be modified, whether it is a completed prepare or a committed item. In persistent buckets, the stored value to be modified is always the previous committed item, as prepares are not kept in the hashtable after completion so this is not an issue. However, in ephemeral buckets, completed prepare stored values remain in the hashtable to be modified by a new durable write, but are not necessarily "in-sync" with the committed value if non-durable writes are present. Because of this, sets need to test the cas against that of the committed stored value _even if_ they are about to modify a completed prepare stored value. Change-Id: I6016d9e4ae373ab8daefd92bc291e02602bb01fb Reviewed-on: http://review.couchbase.org/114812 Reviewed-by: Ben Huddleston <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 5bbe5e8 commit ee83c61

File tree

3 files changed

+73
-29
lines changed

3 files changed

+73
-29
lines changed

engines/ep/src/vbucket.cc

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1784,7 +1784,7 @@ ENGINE_ERROR_CODE VBucket::prepare(
17841784
// Valid duplicate prepare - call processSetInner and skip the
17851785
// SyncWrite checks.
17861786
queueItmCtx.overwritingPrepareSeqno = v->getBySeqno();
1787-
std::tie(status, notifyCtx) = processSetInner(hbl,
1787+
std::tie(status, notifyCtx) = processSetInner(htRes,
17881788
v,
17891789
itm,
17901790
cas,
@@ -3108,7 +3108,7 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
31083108
processImplicitlyCompletedPrepare(htRes.pending);
31093109

31103110
// Add a new or overwrite the existing mutation
3111-
return processSetInner(htRes.pending.getHBL(),
3111+
return processSetInner(htRes,
31123112
htRes.committed,
31133113
itm,
31143114
cas,
@@ -3119,7 +3119,7 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
31193119
maybeKeyExists);
31203120
}
31213121

3122-
return processSetInner(htRes.getHBL(),
3122+
return processSetInner(htRes,
31233123
v,
31243124
itm,
31253125
cas,
@@ -3131,7 +3131,7 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
31313131
}
31323132

31333133
std::pair<MutationStatus, boost::optional<VBNotifyCtx>>
3134-
VBucket::processSetInner(const HashTable::HashBucketLock& hbl,
3134+
VBucket::processSetInner(HashTable::FindCommitResult& htRes,
31353135
StoredValue*& v,
31363136
Item& itm,
31373137
uint64_t cas,
@@ -3140,7 +3140,7 @@ VBucket::processSetInner(const HashTable::HashBucketLock& hbl,
31403140
const VBQueueItemCtx& queueItmCtx,
31413141
cb::StoreIfStatus storeIfStatus,
31423142
bool maybeKeyExists) {
3143-
if (!hbl.getHTLock()) {
3143+
if (!htRes.getHBL().getHTLock()) {
31443144
throw std::invalid_argument(
31453145
"VBucket::processSet: htLock not held for " +
31463146
getId().to_string());
@@ -3183,28 +3183,34 @@ VBucket::processSetInner(const HashTable::HashBucketLock& hbl,
31833183
}
31843184
}
31853185

3186-
if (v) {
3187-
if (!allowExisting && !v->isTempItem() && !v->isDeleted()) {
3186+
// need to test cas and locking against the committed value
3187+
// explicitly, as v may be a completed prepare to be modified
3188+
// containing an unrelated cas, deleted status etc.
3189+
auto* committed = htRes.committed;
3190+
if (committed) {
3191+
if (!allowExisting && !committed->isTempItem() &&
3192+
!committed->isDeleted()) {
31883193
return {MutationStatus::InvalidCas, {}};
31893194
}
3190-
if (v->isLocked(ep_current_time())) {
3195+
if (committed->isLocked(ep_current_time())) {
31913196
/*
31923197
* item is locked, deny if there is cas value mismatch
31933198
* or no cas value is provided by the user
31943199
*/
3195-
if (cas != v->getCas()) {
3200+
if (cas != committed->getCas()) {
31963201
return {MutationStatus::IsLocked, {}};
31973202
}
31983203
/* allow operation*/
3199-
v->unlock();
3200-
} else if (cas && cas != v->getCas()) {
3201-
if (v->isTempNonExistentItem()) {
3204+
committed->unlock();
3205+
} else if (cas && cas != committed->getCas()) {
3206+
if (committed->isTempNonExistentItem()) {
32023207
// This is a temporary item which marks a key as non-existent;
32033208
// therefore specifying a non-matching CAS should be exposed
32043209
// as item not existing.
32053210
return {MutationStatus::NotFound, {}};
32063211
}
3207-
if ((v->isTempDeletedItem() || v->isDeleted()) && !itm.isDeleted()) {
3212+
if ((committed->isTempDeletedItem() || committed->isDeleted()) &&
3213+
!itm.isDeleted()) {
32083214
// Existing item is deleted, and we are not replacing it with
32093215
// a (different) deleted value - return not existing.
32103216
return {MutationStatus::NotFound, {}};
@@ -3214,47 +3220,49 @@ VBucket::processSetInner(const HashTable::HashBucketLock& hbl,
32143220
return {MutationStatus::InvalidCas, {}};
32153221
}
32163222
if (!hasMetaData) {
3217-
itm.setRevSeqno(v->getRevSeqno() + 1);
3223+
itm.setRevSeqno(committed->getRevSeqno() + 1);
32183224
/* MB-23530: We must ensure that a replace operation (i.e.
32193225
* set with a CAS) /fails/ if the old document is deleted; it
32203226
* logically "doesn't exist". However, if the new value is deleted
32213227
* this op is a /delete/ with a CAS and we must permit a
32223228
* deleted -> deleted transition for Deleted Bodies.
32233229
*/
3224-
if (cas && (v->isDeleted() || v->isTempDeletedItem()) &&
3230+
if (cas &&
3231+
(committed->isDeleted() || committed->isTempDeletedItem()) &&
32253232
!itm.isDeleted()) {
32263233
return {MutationStatus::NotFound, {}};
32273234
}
32283235
}
3236+
} else if (cas != 0) {
3237+
// if a cas has been specified but there is no committed item
3238+
// the op should fail
3239+
return {MutationStatus::NotFound, {}};
3240+
}
32293241

3230-
MutationStatus status;
3231-
VBNotifyCtx notifyCtx;
3242+
MutationStatus status;
3243+
VBNotifyCtx notifyCtx;
3244+
if (v) {
32323245
// This is a new SyncWrite, we just want to add a new prepare unless we
32333246
// still have a completed prepare (Ephemeral) which we should replace
32343247
// instead.
32353248
if (v->isCommitted() && !v->isCompleted() && itm.isPending()) {
32363249
std::tie(v, notifyCtx) = addNewStoredValue(
3237-
hbl, itm, queueItmCtx, GenerateRevSeqno::No);
3250+
htRes.getHBL(), itm, queueItmCtx, GenerateRevSeqno::No);
32383251
// Add should always be clean
32393252
status = MutationStatus::WasClean;
32403253
} else {
32413254
std::tie(v, status, notifyCtx) =
3242-
updateStoredValue(hbl, *v, itm, queueItmCtx);
3255+
updateStoredValue(htRes.getHBL(), *v, itm, queueItmCtx);
32433256
}
3244-
3245-
return {status, notifyCtx};
3246-
3247-
} else if (cas != 0) {
3248-
return {MutationStatus::NotFound, {}};
32493257
} else {
3250-
VBNotifyCtx notifyCtx;
32513258
auto genRevSeqno = hasMetaData ? GenerateRevSeqno::No :
32523259
GenerateRevSeqno::Yes;
3253-
std::tie(v, notifyCtx) =
3254-
addNewStoredValue(hbl, itm, queueItmCtx, genRevSeqno);
3260+
std::tie(v, notifyCtx) = addNewStoredValue(
3261+
htRes.getHBL(), itm, queueItmCtx, genRevSeqno);
32553262
itm.setRevSeqno(v->getRevSeqno());
3256-
return {MutationStatus::WasClean, notifyCtx};
3263+
status = MutationStatus::WasClean;
32573264
}
3265+
return {status, notifyCtx};
32583266
}
32593267

32603268
std::pair<AddStatus, boost::optional<VBNotifyCtx>> VBucket::processAdd(

engines/ep/src/vbucket.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1700,7 +1700,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
17001700
* Inner function for processSet. Allows overwriting of in-flight prepares.
17011701
*/
17021702
std::pair<MutationStatus, boost::optional<VBNotifyCtx>> processSetInner(
1703-
const HashTable::HashBucketLock& hbl,
1703+
HashTable::FindCommitResult& htRes,
17041704
StoredValue*& v,
17051705
Item& itm,
17061706
uint64_t cas,

engines/ep/tests/module_tests/evp_store_durability_test.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,42 @@ TEST_P(DurabilityBucketTest, SyncWriteDelete) {
807807
ASSERT_EQ(1, ckptList.back()->getNumItems());
808808
}
809809

810+
TEST_P(DurabilityBucketTest, SyncWriteComparesToCorrectCas) {
811+
setVBucketStateAndRunPersistTask(
812+
vbid,
813+
vbucket_state_active,
814+
{{"topology", nlohmann::json::array({{"active", "replica"}})}});
815+
816+
auto& vb = *store->getVBucket(vbid);
817+
818+
// prepare SyncWrite and commit.
819+
auto key = makeStoredDocKey("key");
820+
auto pending = makePendingItem(key, "value");
821+
ASSERT_EQ(ENGINE_SYNC_WRITE_PENDING, store->set(*pending, cookie));
822+
ASSERT_EQ(ENGINE_SUCCESS,
823+
vb.commit(key,
824+
pending->getBySeqno(),
825+
{} /*commitSeqno*/,
826+
vb.lockCollections(key)));
827+
828+
vb.processResolvedSyncWrites();
829+
830+
// Non-durable write to same key
831+
832+
auto committed = makeCommittedItem(key, "some_other_value");
833+
ASSERT_EQ(ENGINE_SUCCESS, store->set(*committed, cookie));
834+
835+
// get cas
836+
uint64_t cas = store->get(key, vbid, cookie, {}).item->getCas();
837+
838+
// now do another SyncWrite with a cas
839+
pending = makePendingItem(key, "new_value");
840+
pending->setCas(cas);
841+
842+
// Should succeed - has correct cas
843+
ASSERT_EQ(ENGINE_SYNC_WRITE_PENDING, store->set(*pending, cookie));
844+
}
845+
810846
void DurabilityEPBucketTest::verifyOnDiskItemCount(VBucket& vb,
811847
uint64_t expectedValue) {
812848
// skip for rocksdb as it treats every mutation as an insertion

0 commit comments

Comments
 (0)