Skip to content

Commit e8647d1

Browse files
committed
MB-59746 [1/2]: Restore CAS to original value after unlock()
+Problem+ When a document is locked via GetLocked, KV-Engine generates a new "locked CAS", stores it in memory, and returns it to the client for it to use in a subsequent Mutation(CAS=locked_CAS) or Unlock() operation. However, when the document is unlocked without modification (lock times out, or an explicit Unlock() call is made), the previous CAS value is *not* restored. As such, the in-memory representation of the document has an artificially higher CAS than the document "really" has. When using LWW XDCR this is problematic; as it means that a document could have an artificially higher CAS than it "really" has, and hence conflict resolution could incorrectly fail to update the document. Consider the following scenario: 1. A LWW XDCR relation from source S to destination D is established, and is performing _optimistic_ conflict resolution. Buckets are in sync, assume CAS of all documents is from time T=0. 2. At time T=10, a Mutation operation occurs on the source bucket S against the doc X. 3. At T=11 a GetLocked operation occurs on the destination Bucket D against the same doc. 4. At T=12 the destination bucket D document is Unlocked, leaving the CAS with the advanced value from T=11. 5. Due to XDCR processing lag / network latency, the SetWithMeta() arrives at {{D}} at T=13 - after the document has been unlocked. KV-Engine's conflict resolver will compare the (locked) CAS at the destination (11) with the incoming CAS (10), and decide the destination should win - and the update will not be applied. This is incorrect - we should have used CAS value before the document was locked (T=0) for conflict resolution. +Solution+ To address this, ep-engine needs to keep a copy of the original (pre-lock) CAS, and restore the documents' CAS to this when the document is unlocked. This patch does this in a straightforward manner - add a second locked_CAS field to StoredValue to fix this issue. This has the downside of increasing the size of per-document in-memory metadata by 8 Bytes. An (space) optimisated version will follow in a subsequent patch. Change-Id: Icc274626dd5c1269bc1df65a2e26304df4aaf554 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/201510 Tested-by: Dave Rigby <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Trond Norbye <[email protected]>
1 parent f11cd84 commit e8647d1

File tree

10 files changed

+235
-28
lines changed

10 files changed

+235
-28
lines changed

engines/ep/src/kv_bucket.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1958,7 +1958,7 @@ cb::engine_errc KVBucket::unlockKey(const DocKey& key,
19581958
return cb::engine_errc::no_such_key;
19591959
}
19601960
if (v->isLocked(currentTime)) {
1961-
if (v->getCas() == cas) {
1961+
if (v->getCasForWrite(currentTime) == cas) {
19621962
v->unlock();
19631963
return cb::engine_errc::success;
19641964
}

engines/ep/src/stored-value.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ static std::string getSystemEventsValueFromStoredValue(const StoredValue& sv) {
516516
}
517517

518518
std::ostream& operator<<(std::ostream& os, const StoredValue& sv) {
519+
const auto now = ep_current_time();
519520
// type, address
520521
os << (sv.isOrdered() ? "OSV @" : " SV @") << &sv << " ";
521522

@@ -529,7 +530,7 @@ std::ostream& operator<<(std::ostream& os, const StoredValue& sv) {
529530
os << (sv.isDirty() ? 'W' : '.');
530531
os << (sv.isDeleted() ? 'D' : '.');
531532
os << (sv.isResident() ? 'R' : '.');
532-
os << (sv.isLocked(ep_current_time()) ? 'L' : '.');
533+
os << (sv.isLocked(now) ? 'L' : '.');
533534
switch (sv.getCommitted()) {
534535
case CommittedState::CommittedViaMutation:
535536
os << "Cm";
@@ -573,6 +574,9 @@ std::ostream& operator<<(std::ostream& os, const StoredValue& sv) {
573574
os << std::dec << "seq:" << uint64_t(sv.getBySeqno())
574575
<< " rev:" << sv.getRevSeqno();
575576
os << " cas:" << sv.getCas();
577+
if (sv.isLocked(now)) {
578+
os << " locked_cas:" << sv.locked_cas;
579+
}
576580
os << " key:\"" << sv.getKey() << "\"";
577581
if (sv.isOrdered() && sv.isDeleted()) {
578582
os << " del_time:"

engines/ep/src/stored-value.h

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,20 @@ class StoredValue {
454454
return cas;
455455
}
456456

457+
/**
458+
* Get the CAS needed to modify this SV using optimistic concurrency
459+
* control. This is normally the current CAS (the CAS of the last mutation
460+
* to the doc), but if the document is locked then it is the special
461+
* locked_CAS generated by getLocked() and returned to the caller.
462+
* @param curtime The current time, used to check if lock has expired
463+
*/
464+
uint64_t getCasForWrite(rel_time_t curtime) const {
465+
if (isLocked(curtime)) {
466+
return locked_cas;
467+
}
468+
return cas;
469+
}
470+
457471
/**
458472
* Set a new CAS ID.
459473
*/
@@ -462,15 +476,17 @@ class StoredValue {
462476
}
463477

464478
/**
465-
* Lock this item until the given time.
479+
* Lock this item until the given time, setting the locked CAS to
480+
* the specified value.
466481
*/
467-
void lock(rel_time_t expiry) {
482+
void lock(rel_time_t expiry, uint64_t cas) {
468483
if (isDeleted()) {
469484
// Cannot lock Deleted items.
470485
throw std::logic_error(
471486
"StoredValue::lock: Called on Deleted item");
472487
}
473488
lock_expiry_or_delete_or_complete_time.lock_expiry = expiry;
489+
locked_cas = cas;
474490
}
475491

476492
/**
@@ -482,6 +498,7 @@ class StoredValue {
482498
return;
483499
}
484500
lock_expiry_or_delete_or_complete_time.lock_expiry = 0;
501+
locked_cas = 0;
485502
}
486503

487504
/**
@@ -1058,6 +1075,10 @@ class StoredValue {
10581075
// the tag.
10591076
UniquePtr chain_next_or_replacement; // 8 bytes (2-byte tag, 6 byte address)
10601077
uint64_t cas; //!< CAS identifier.
1078+
// The CAS to use to modify/unlock a locked document. Stored separately from
1079+
// 'cas' as we must preserve the original CAS in the event the locked
1080+
// document is unlocked without being modified.
1081+
uint64_t locked_cas;
10611082
// bySeqno is atomic primarily for TSAN, which would flag that we write/read
10621083
// this in ephemeral backfills with different locks (which is true, but the
10631084
// access is we believe actually safe)

engines/ep/src/vbucket.cc

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3124,11 +3124,10 @@ GetValue VBucket::getLocked(rel_time_t currentTime,
31243124
}
31253125

31263126
// acquire lock and increment cas value
3127-
v->lock(currentTime + lockTimeout);
3127+
v->lock(currentTime + lockTimeout, nextHLCCas());
31283128

31293129
auto it = v->toItem(getId());
3130-
it->setCas(nextHLCCas());
3131-
v->setCas(it->getCas());
3130+
it->setCas(v->getCasForWrite(currentTime));
31323131

31333132
return GetValue(std::move(it));
31343133
}
@@ -3491,17 +3490,18 @@ std::pair<MutationStatus, std::optional<VBNotifyCtx>> VBucket::processSetInner(
34913490
!committed->isDeleted()) {
34923491
return {MutationStatus::InvalidCas, {}};
34933492
}
3494-
if (committed->isLocked(ep_current_time())) {
3493+
const auto now = ep_current_time();
3494+
if (committed->isLocked(now)) {
34953495
/*
34963496
* item is locked, deny if there is cas value mismatch
34973497
* or no cas value is provided by the user
34983498
*/
3499-
if (cas != committed->getCas()) {
3499+
if (cas != committed->getCasForWrite(now)) {
35003500
return {MutationStatus::IsLocked, {}};
35013501
}
35023502
/* allow operation*/
35033503
committed->unlock();
3504-
} else if (cas && cas != committed->getCas()) {
3504+
} else if (cas && cas != committed->getCasForWrite(now)) {
35053505
if (committed->isTempNonExistentItem()) {
35063506
// This is a temporary item which marks a key as non-existent;
35073507
// therefore specifying a non-matching CAS should be exposed
@@ -3771,15 +3771,19 @@ VBucket::processSoftDeleteInner(const HashTable::HashBucketLock& hbl,
37713771
return std::make_tuple(MutationStatus::NeedBgFetch, &v, empty);
37723772
}
37733773

3774-
if (v.isLocked(ep_current_time())) {
3775-
if (cas != v.getCas()) {
3774+
// Perform CAS check. If document is locked then the request cas must
3775+
// match the locked CAS (i.e. the same CAS returned from the getLocked()
3776+
// operation), if not locked then check against the normal cas (last
3777+
// modified time) if CAS specified otherwise no CAS permitted.
3778+
const auto now = ep_current_time();
3779+
if (v.isLocked(now)) {
3780+
if (cas != v.getCasForWrite(now)) {
37763781
return std::make_tuple(MutationStatus::IsLocked, &v, empty);
37773782
}
3778-
v.unlock();
3779-
}
3780-
3781-
if (cas != 0 && cas != v.getCas()) {
3782-
return std::make_tuple(MutationStatus::InvalidCas, &v, empty);
3783+
} else {
3784+
if (cas != 0 && cas != v.getCas()) {
3785+
return std::make_tuple(MutationStatus::InvalidCas, &v, empty);
3786+
}
37833787
}
37843788

37853789
/* allow operation */

engines/ep/tests/ep_testsuite_xdcr.cc

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3173,6 +3173,75 @@ static enum test_result test_expiration_options(EngineIface* h) {
31733173
return SUCCESS;
31743174
}
31753175

3176+
// Regression test for MB-59744 - a SetWithMeta against a document which was
3177+
// previously locked but has been unlocked without modification should perform
3178+
// conflict resolution using the original CAS before the document was locked.
3179+
static enum test_result test_mb_59744_set_meta_lww_after_locked(EngineIface* h) {
3180+
3181+
// 1. Store an initial document so we can lock it. Create with a known CAS
3182+
// value so we can later perform a second SetWithMeta which _should_
3183+
// win conflict resolution.
3184+
ItemMetaData itemMeta;
3185+
itemMeta.revSeqno = 1;
3186+
itemMeta.cas = 1000;
3187+
3188+
std::string key{"key"};
3189+
Vbid vb{0};
3190+
checkeq(cb::engine_errc::success,
3191+
set_with_meta(h,
3192+
key.data(),
3193+
key.size(),
3194+
{},
3195+
0,
3196+
vb,
3197+
&itemMeta,
3198+
0,
3199+
FORCE_ACCEPT_WITH_META_OPS),
3200+
"Expected item to be stored");
3201+
3202+
// 2. Lock the document. This will set the locked_CAS to the current HLC
3203+
// time, which is nanoseconds since unix epoch, so much larger than
3204+
// the CAS we just set (1000).
3205+
auto* cookie = testHarness->create_cookie(h);
3206+
auto locked = getl(h, cookie, key.data(), vb, 0);
3207+
checkeq(cb::engine_errc::success, locked.first, "Expected GetLocked to succeed");
3208+
checkgt(locked.second->getCas(), itemMeta.cas, "Expected locked CAs to be greater than current CAS");
3209+
3210+
// 3. Unlock the document. Before the fix for MB-59744, this would leave
3211+
// the document having the same increased locked_CAS.
3212+
checkeq(cb::engine_errc::success,
3213+
unl(h, cookie, key.c_str(), vb, locked.second->getCas()),
3214+
"Unlock should have succeedd");
3215+
3216+
// 4. Test - attempt to set with meta, using confict resolution, to a
3217+
// CAS value one greater than previous mutation, but crucially less than
3218+
// the temporary locked CAS.
3219+
itemMeta.revSeqno++;
3220+
itemMeta.cas++;
3221+
checkeq(cb::engine_errc::success,
3222+
set_with_meta(h,
3223+
key.data(),
3224+
key.size(),
3225+
{},
3226+
0,
3227+
vb,
3228+
&itemMeta,
3229+
0,
3230+
FORCE_ACCEPT_WITH_META_OPS),
3231+
"Expected SetWithMeta with CAS+1 to succeed");
3232+
3233+
// 5. Check document has indeed been updated with new CAS / seqno.
3234+
cb::EngineErrorMetadataPair meta;
3235+
check(get_meta(h, key.c_str(), meta, cookie), "Expected GetMeta to succeed");
3236+
checkeq(itemMeta.cas, meta.second.cas, "Expected CAS to be updated");
3237+
checkeq(uint64_t(itemMeta.revSeqno), meta.second.seqno,
3238+
"Expected revid to be updated");
3239+
3240+
testHarness->destroy_cookie(cookie);
3241+
3242+
return SUCCESS;
3243+
}
3244+
31763245
// Test manifest //////////////////////////////////////////////////////////////
31773246

31783247
const char* default_dbname = "./ep_testsuite_xdcr.db";
@@ -3519,4 +3588,12 @@ BaseTestCase testsuite_testcases[] = {
35193588
prepare,
35203589
cleanup),
35213590

3522-
TestCase(nullptr, nullptr, nullptr, nullptr, nullptr, prepare, cleanup)};
3591+
TestCase("MB-59744 SetMeta LWW after locked",
3592+
test_mb_59744_set_meta_lww_after_locked,
3593+
test_setup,
3594+
teardown,
3595+
"conflict_resolution_type=lww",
3596+
prepare,
3597+
cleanup),
3598+
3599+
TestCase(nullptr, nullptr, nullptr, nullptr, nullptr, prepare, cleanup)};

engines/ep/tests/module_tests/kv_bucket_test.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1421,11 +1421,13 @@ TEST_P(KVBucketParamTest, unlockKeyTempDeletedTest) {
14211421
GetValue gv =
14221422
store->getAndUpdateTtl(key, vbid, cookie, ep_real_time() + 10000);
14231423
EXPECT_EQ(cb::engine_errc::success, gv.getStatus());
1424+
auto docCas = gv.item->getCas();
14241425

14251426
gv = store->getLocked(key, vbid, ep_current_time(), 10, cookie);
14261427
EXPECT_EQ(cb::engine_errc::success, gv.getStatus());
14271428

1428-
itm.setCas(gv.item->getCas());
1429+
// Need the "real" documents' CAS for expiry.
1430+
itm.setCas(docCas);
14291431
store->processExpiredItem(itm, ep_real_time() + 10001, ExpireBy::Pager);
14301432

14311433
flushVBucketToDiskIfPersistent(vbid, 1);

engines/ep/tests/module_tests/stored_value_test.cc

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,9 @@ TYPED_TEST(ValueTest, freqCounterNotReset) {
388388
/// size (we've carefully crafted them to be as efficient as possible).
389389
TEST(StoredValueTest, expectedSize) {
390390
#ifdef CB_MEMORY_INEFFICIENT_TAGGED_PTR
391-
const long expected_size = 64;
391+
const long expected_size = 72;
392392
#else
393-
const long expected_size = 56;
393+
const long expected_size = 64;
394394
#endif
395395
EXPECT_EQ(expected_size, sizeof(StoredValue))
396396
<< "Unexpected change in StoredValue fixed size";
@@ -419,16 +419,61 @@ TYPED_TEST(ValueTest, MB_32568) {
419419
EXPECT_EQ(DeleteSource::TTL, this->sv->getDeletionSource());
420420
}
421421

422+
// Validate the CAS for writing changes when locked, and is restored afterwards
423+
// when unlocked.
424+
TYPED_TEST(ValueTest, LockedCas) {
425+
this->sv->setCas(123);
426+
ASSERT_EQ(123, this->sv->getCasForWrite(0))
427+
<< "CAS used for writing should be main CAS if not locked";
428+
429+
rel_time_t lock_expiry{10};
430+
this->sv->lock(lock_expiry, 456);
431+
EXPECT_EQ(456, this->sv->getCasForWrite(lock_expiry - 1))
432+
<< "CAS used for writing should have changed to locked CAS while "
433+
"lock still valid";
434+
EXPECT_EQ(123, this->sv->getCas())
435+
<< "main CAS should be unchanged when locked";
436+
437+
this->sv->unlock();
438+
EXPECT_EQ(123, this->sv->getCasForWrite(lock_expiry - 1))
439+
<< "CAS used for writing should have reverted to previous VAS "
440+
"after explicit unlock";
441+
EXPECT_EQ(123, this->sv->getCas())
442+
<< "main CAS should be unchanged after explicit unlock";
443+
}
444+
445+
// Validate the CAS for writing changes when locked, and is restored when lock
446+
// expires.
447+
TYPED_TEST(ValueTest, LockedCasExpired) {
448+
this->sv->setCas(123);
449+
ASSERT_EQ(123, this->sv->getCasForWrite(0))
450+
<< "CAS used for writing should be main CAS if not locked";
451+
452+
rel_time_t lock_expiry{10};
453+
this->sv->lock(lock_expiry, 456);
454+
EXPECT_EQ(456, this->sv->getCasForWrite(lock_expiry - 1))
455+
<< "CAS used for writing should have changed to locked CAS while "
456+
"lock still valid";
457+
EXPECT_EQ(123, this->sv->getCas())
458+
<< "main CAS should be unchanged when locked";
459+
460+
EXPECT_EQ(123, this->sv->getCasForWrite(lock_expiry + 1))
461+
<< "CAS used for writing should have reverted to previous VAS "
462+
"after lock expires";
463+
EXPECT_EQ(123, this->sv->getCas())
464+
<< "main CAS should be unchanged after locked expires";
465+
}
466+
422467
/**
423468
* Test fixture for OrderedStoredValue-only tests.
424469
*/
425470
class OrderedStoredValueTest : public ValueTest<OrderedStoredValueFactory> {};
426471

427472
TEST_F(OrderedStoredValueTest, expectedSize) {
428473
#ifdef CB_MEMORY_INEFFICIENT_TAGGED_PTR
429-
const long expected_size = 80;
474+
const long expected_size = 88;
430475
#else
431-
const long expected_size = 80;
476+
const long expected_size = 88;
432477
#endif
433478

434479
EXPECT_EQ(expected_size, sizeof(OrderedStoredValue))

include/memcached/protocol_binary.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2192,6 +2192,9 @@ namespace mcbp::cas {
21922192
* The special value used as a wildcard and match all CAS values
21932193
*/
21942194
const uint64_t Wildcard = 0x0;
2195+
2196+
/// The special value returned from Get and similar when document is locked.
2197+
const uint64_t Locked = 0xffff'ffff'ffff'ffff;
21952198
} // namespace mcbp::cas
21962199

21972200
namespace cb::mcbp::request {

0 commit comments

Comments
 (0)