Skip to content

Commit 3f8628f

Browse files
committed
MB-34946: Remove prepare when receiving overwriting mutation
When a replica is backfilling from disk it will often be sent a Mutation instead of a Commit. This Mutation logically commits the prepare that the replica may already have. Currently the code will overwrite the prepare with a mutation. This is not valid as the replica may already have a mutation for this commit and a following lookup in the HashTable would cause a precondition to fail as there would now exist two commited StoredValues in the HashTable. Instead, the replica should either remove the prepare (EP vBuckets) or mark it as completed (Ephemeral vBuckets) and either add a new mutation or overwrite the existing one. Update the setWithMeta and deleteWithMeta functions that are called by the replica to process this mutation instead of a commit to lookup both the prepare and committed StoredValues and deal with them appropriately. Change-Id: I0a42616a36bf50e92e67e93746574b616b97dc09 Reviewed-on: http://review.couchbase.org/111812 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 7491c6c commit 3f8628f

File tree

8 files changed

+148
-23
lines changed

8 files changed

+148
-23
lines changed

engines/ep/src/ep_vb.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -857,3 +857,19 @@ bool EPVBucket::isValidDurabilityLevel(cb::durability::Level level) {
857857

858858
folly::assume_unreachable();
859859
}
860+
861+
void EPVBucket::processImplicitlyCompletedPrepare(
862+
HashTable::StoredValueProxy& v) {
863+
// As we have passed a StoredValueProxy to this function (the callers need
864+
// a HashTable::FindCommitResult) we need to be careful about our stats
865+
// updates. The StoredValueProxy attempts to do a
866+
// HashTable::Statistics::epilogue stats update when we destruct it. This is
867+
// generally fine, but if we want to use any other HashTable function with
868+
// a StoredValueProxy we need a way to skip the StoredValueProxy's stats
869+
// update as the other HashTable function will do it's own. In this case,
870+
// we can call StoredValueProxy::release to release the ownership of the
871+
// pointer in the StoredValueProxy and skip any stats update. This consumes
872+
// the StoredValue* and invalidates the StoredValueProxy so it should not be
873+
// used after.
874+
ht.unlocked_del(v.getHBL(), v.release());
875+
}

engines/ep/src/ep_vb.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,9 @@ class EPVBucket : public VBucket {
296296

297297
bool isValidDurabilityLevel(cb::durability::Level level) override;
298298

299+
void processImplicitlyCompletedPrepare(
300+
HashTable::StoredValueProxy& v) override;
301+
299302
/**
300303
* Total number of alive (non-deleted) items on-disk in this vBucket.
301304
* Initially populated during warmup as the number of items on disk;

engines/ep/src/ephemeral_vb.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,3 +926,8 @@ bool EphemeralVBucket::isValidDurabilityLevel(cb::durability::Level level) {
926926

927927
folly::assume_unreachable();
928928
}
929+
930+
void EphemeralVBucket::processImplicitlyCompletedPrepare(
931+
HashTable::StoredValueProxy& v) {
932+
v.setCommitted(CommittedState::PrepareCommitted);
933+
}

engines/ep/src/ephemeral_vb.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,9 @@ class EphemeralVBucket : public VBucket {
318318

319319
bool isValidDurabilityLevel(cb::durability::Level level) override;
320320

321+
void processImplicitlyCompletedPrepare(
322+
HashTable::StoredValueProxy& htRes) override;
323+
321324
/**
322325
* (i) Repositions an already non-temp element in the sequence list (OR)
323326
* (ii) For a temp item that is being updated (that is, being made non-temp

engines/ep/src/vbucket.cc

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3152,11 +3152,19 @@ std::pair<MutationStatus, boost::optional<VBNotifyCtx>> VBucket::processSet(
31523152
getPassiveDM().completeSyncWrite(
31533153
itm.getKey(), PassiveDurabilityMonitor::Resolution::Commit);
31543154

3155-
// @TODO we must remove the prepare and overwrite the mutation if we
3156-
// are replacing a prepare with a mutation
3157-
// Release the pending SV from the SVP that is holding it to prevent
3158-
// a double stat update that would cause a stat underflow exception.
3159-
htRes.pending.release();
3155+
// Deal with the already existing prepare
3156+
processImplicitlyCompletedPrepare(htRes.pending);
3157+
3158+
// Add a new or overwrite the existing mutation
3159+
return processSetInner(htRes.pending.getHBL(),
3160+
htRes.committed,
3161+
itm,
3162+
cas,
3163+
allowExisting,
3164+
hasMetaData,
3165+
queueItmCtx,
3166+
storeIfStatus,
3167+
maybeKeyExists);
31603168
}
31613169

31623170
// This is a new SyncWrite, we just want to add a new prepare unless we
@@ -3400,11 +3408,24 @@ VBucket::processSoftDelete(HashTable::FindCommitResult& htRes,
34003408
StoredDocKey(v.getKey()),
34013409
PassiveDurabilityMonitor::Resolution::Commit);
34023410

3403-
// @TODO we must remove the prepare and overwrite the mutation if we
3404-
// are replacing a prepare with a mutation
3405-
// Release the pending SV from the SVP that is holding it to prevent
3406-
// a double stat update that would cause a stat underflow exception.
3407-
htRes.pending.release();
3411+
// Sanity - We should never delete a prepare in this way, and if we are
3412+
// hitting this delete path then we should have the committed SV.
3413+
if (!htRes.committed) {
3414+
return {MutationStatus::NotFound, htRes.committed, boost::none};
3415+
}
3416+
3417+
// Deal with the existing prepare
3418+
processImplicitlyCompletedPrepare(htRes.pending);
3419+
3420+
// Delete the existing committed SV.
3421+
return processSoftDeleteInner(htRes.getHBL(),
3422+
*htRes.committed,
3423+
cas,
3424+
metadata,
3425+
queueItmCtx,
3426+
use_meta,
3427+
bySeqno,
3428+
deleteSource);
34083429
}
34093430

34103431
return processSoftDeleteInner(htRes.getHBL(),

engines/ep/src/vbucket.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
18761876
bool use_meta,
18771877
uint64_t bySeqno,
18781878
DeleteSource deleteSource);
1879+
18791880
/**
18801881
* Delete a key (associated StoredValue) from ALL in-memory data structures
18811882
* like HT.
@@ -2365,6 +2366,16 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
23652366
*/
23662367
VBNotifyCtx queueItem(queued_item& item, const VBQueueItemCtx& ctx);
23672368

2369+
/**
2370+
* Deal with the prepare in the HashTable in the derived class specific way
2371+
* as it is to be "replaced" by a mutation. Consumes the StoredValue* in the
2372+
* StoredValueProxy making it no longer usable.
2373+
*
2374+
* @param v StoredValueProxy of the prepare to complete
2375+
*/
2376+
virtual void processImplicitlyCompletedPrepare(
2377+
HashTable::StoredValueProxy& v) = 0;
2378+
23682379
Vbid id;
23692380
std::atomic<vbucket_state_t> state;
23702381
folly::SharedMutex stateLock;

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -704,8 +704,8 @@ void DurabilityPassiveStreamTest::
704704

705705
SnapshotMarker marker(opaque,
706706
vbid,
707-
1 /*snapStart*/,
708-
3 /*snapEnd*/,
707+
2 /*snapStart*/,
708+
4 /*snapEnd*/,
709709
dcp_marker_flag_t::MARKER_FLAG_DISK,
710710
{} /*streamId*/);
711711
stream->processMarker(&marker);
@@ -714,7 +714,7 @@ void DurabilityPassiveStreamTest::
714714
using namespace cb::durability;
715715
auto item = makePendingItem(
716716
key, "value", Requirements(Level::Majority, Timeout::Infinity()));
717-
item->setBySeqno(1);
717+
item->setBySeqno(2);
718718
item->setCas(999);
719719

720720
// Send the prepare
@@ -730,7 +730,7 @@ void DurabilityPassiveStreamTest::
730730
cb::mcbp::DcpStreamId{})));
731731

732732
item = makeCommittedItem(key, "committed");
733-
item->setBySeqno(3);
733+
item->setBySeqno(4);
734734

735735
if (docState == DocumentState::Deleted) {
736736
item->setDeleted(DeleteSource::Explicit);
@@ -748,6 +748,26 @@ void DurabilityPassiveStreamTest::
748748
DocKeyEncodesCollectionId::No,
749749
nullptr,
750750
cb::mcbp::DcpStreamId{})));
751+
752+
auto vb = store->getVBucket(vbid);
753+
ASSERT_TRUE(vb);
754+
{
755+
// findForCommit will return both pending and committed perspectives
756+
auto res = vb->ht.findForCommit(key);
757+
ASSERT_TRUE(res.committed);
758+
EXPECT_EQ(4, res.committed->getBySeqno());
759+
if (docState == DocumentState::Alive) {
760+
EXPECT_TRUE(res.committed->getValue());
761+
}
762+
if (persistent()) {
763+
EXPECT_FALSE(res.pending);
764+
} else {
765+
ASSERT_TRUE(res.pending);
766+
EXPECT_EQ(2, res.pending->getBySeqno());
767+
EXPECT_EQ(CommittedState::PrepareCommitted,
768+
res.pending->getCommitted());
769+
}
770+
}
751771
}
752772

753773
TEST_P(DurabilityPassiveStreamTest,
@@ -756,9 +776,44 @@ TEST_P(DurabilityPassiveStreamTest,
756776
DocumentState::Alive);
757777
}
758778

779+
void DurabilityPassiveStreamTest::
780+
receiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDiskMutationFirst(
781+
DocumentState docState) {
782+
uint32_t opaque = 1;
783+
SnapshotMarker marker(opaque,
784+
vbid,
785+
1 /*snapStart*/,
786+
3 /*snapEnd*/,
787+
dcp_marker_flag_t::MARKER_FLAG_DISK,
788+
{} /*streamId*/);
789+
stream->processMarker(&marker);
790+
791+
auto key = makeStoredDocKey("key");
792+
auto item = makeCommittedItem(key, "mutation");
793+
item->setBySeqno(1);
794+
795+
EXPECT_EQ(ENGINE_SUCCESS,
796+
stream->messageReceived(std::make_unique<MutationConsumerMessage>(
797+
std::move(item),
798+
opaque,
799+
IncludeValue::Yes,
800+
IncludeXattrs::Yes,
801+
IncludeDeleteTime::No,
802+
DocKeyEncodesCollectionId::No,
803+
nullptr,
804+
cb::mcbp::DcpStreamId{})));
805+
testReceiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDisk(docState);
806+
}
807+
759808
TEST_P(DurabilityPassiveStreamTest,
760-
ReceiveDeletionInsteadOfCommitWhenStreamingFromDisk) {
761-
testReceiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDisk(
809+
ReceiveMutationInsteadOfCommitOnTopOfMutation) {
810+
receiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDiskMutationFirst(
811+
DocumentState::Alive);
812+
}
813+
814+
TEST_P(DurabilityPassiveStreamTest,
815+
ReceiveDeletionInsteadOfCommitOnTopOfMutation) {
816+
receiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDiskMutationFirst(
762817
DocumentState::Deleted);
763818
}
764819

@@ -816,7 +871,14 @@ void DurabilityPassiveStreamTest::
816871
{
817872
// findForCommit will return both pending and committed perspectives
818873
auto res = vb->ht.findForCommit(key);
819-
EXPECT_FALSE(res.pending);
874+
if (persistent()) {
875+
EXPECT_FALSE(res.pending);
876+
} else {
877+
ASSERT_TRUE(res.pending);
878+
EXPECT_EQ(1, res.pending->getBySeqno());
879+
EXPECT_EQ(CommittedState::PrepareCommitted,
880+
res.pending->getCommitted());
881+
}
820882
ASSERT_TRUE(res.committed);
821883
EXPECT_EQ(4, res.committed->getBySeqno());
822884
EXPECT_EQ(CommittedState::CommittedViaMutation,
@@ -853,12 +915,6 @@ TEST_P(DurabilityPassiveStreamTest,
853915
DocumentState::Alive);
854916
}
855917

856-
TEST_P(DurabilityPassiveStreamTest,
857-
ReceiveDeletionInsteadOfCommitForReconnectWindowWithPrepareLast) {
858-
testReceiveMutationOrDeletionInsteadOfCommitForReconnectWindowWithPrepareLast(
859-
DocumentState::Deleted);
860-
}
861-
862918
TEST_P(DurabilityPassiveStreamTest,
863919
ReceiveAbortOnTopOfCommittedDueToDedupedPrepare) {
864920
uint32_t opaque = 0;

engines/ep/tests/module_tests/dcp_durability_stream_test.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,16 @@ class DurabilityPassiveStreamTest : public SingleThreadedPassiveStreamTest {
100100
void testReceiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDisk(
101101
DocumentState docState);
102102

103+
/**
104+
* Test that a mutation or deletion sent instead of a commit is accepted by
105+
* the replica when backfilling from disk if it already has a mutation.
106+
*
107+
* @param docState Shoudl we send a mutation or a deletion?
108+
*/
109+
void
110+
receiveMutationOrDeletionInsteadOfCommitWhenStreamingFromDiskMutationFirst(
111+
DocumentState docState);
112+
103113
/**
104114
* Test that a mutaiton or deletion sent instead of a commit is accepted by
105115
* the replica when in the reconnect window for which a prepare may be

0 commit comments

Comments
 (0)