Skip to content

Commit 0e6003c

Browse files
committed
MB-59518: Stop PassiveStream from processing commit against active VB
Add VBucket state checks to processCommit so that the commit is not processed against an active vbucket. Without the state check a buffered commit could be processed concurrently with a replica->active state change. A unit test is added which reproduced the issue by buffering a commit and using a test hook to change state whilst the buffered data is processed. Note this bug extends to: * abort * mutation, deletion, system-event * snapshot marker Subsequent patches address these operations. Change-Id: Ifb5e1c7703d7fe2af0b92b699dffb091dea0b235 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/200568 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Jim Walker <[email protected]> Well-Formed: Restriction Checker
1 parent 5b2ea3d commit 0e6003c

File tree

5 files changed

+178
-0
lines changed

5 files changed

+178
-0
lines changed

engines/ep/src/dcp/passive_stream.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,10 @@ cb::engine_errc PassiveStream::processCommit(
818818
if (!vb) {
819819
return cb::engine_errc::not_my_vbucket;
820820
}
821+
folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
822+
if (!permittedVBStates.test(vb->getState())) {
823+
return cb::engine_errc::not_my_vbucket;
824+
}
821825

822826
auto rv = vb->commit(commit.getKey(),
823827
commit.getPreparedSeqno(),

engines/ep/src/dcp/passive_stream.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#pragma once
1212

1313
#include "dcp/stream.h"
14+
#include "permitted_vb_states.h"
1415
#include "spdlog/common.h"
1516
#include "utilities/testing_hook.h"
1617
#include "vbucket_fwd.h"
@@ -434,4 +435,9 @@ class PassiveStream : public Stream {
434435

435436
// True if the consumer/producer enabled FlatBuffers
436437
bool flatBuffersSystemEventsEnabled{false};
438+
439+
// Set of states that the vbucket must match for a PassiveStream to attempt
440+
// processing messages.
441+
const PermittedVBStates permittedVBStates{vbucket_state_replica,
442+
vbucket_state_pending};
437443
};

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4551,6 +4551,92 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
45514551
EXPECT_EQ(ackBytes, consumer->getFlowControl().getFreedBytes());
45524552
}
45534553

4554+
// MB-59518 is an issue where a DCP passive-stream processed a commit against
4555+
// an active vbucket resulting in a duplicate commit (and failure). The issue
4556+
// occurred because it was possible to reach the VBucket::commit function
4557+
// during a replica->active state change provided that operations are buffered.
4558+
// The fix was to put strong VBucket state checks in the PassiveStream commit
4559+
// path.
4560+
TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
4561+
// Begin by pushing a mutation. Only to get away from seqno:0
4562+
auto key = makeStoredDocKey("key");
4563+
EXPECT_EQ(cb::engine_errc::success,
4564+
snapshot(*consumer, stream->getOpaque(), 1, 1));
4565+
EXPECT_EQ(cb::engine_errc::success,
4566+
mutation(*consumer, stream->getOpaque(), key, 1));
4567+
flushVBucketToDiskIfPersistent(vbid);
4568+
4569+
// Now push a prepare onto the replica.
4570+
auto durableKey = makeStoredDocKey("durable");
4571+
EXPECT_EQ(cb::engine_errc::success,
4572+
snapshot(*consumer, stream->getOpaque(), 2, 2));
4573+
EXPECT_EQ(cb::engine_errc::success,
4574+
prepare(*consumer, stream->getOpaque(), durableKey, 2));
4575+
flushVBucketToDiskIfPersistent(vbid);
4576+
4577+
// Now begin buffering.
4578+
auto& stats = engine->getEpStats();
4579+
stats.replicationThrottleThreshold = 0;
4580+
const size_t size = stats.getMaxDataSize();
4581+
engine->setMaxDataSize(1);
4582+
ASSERT_EQ(ReplicationThrottle::Status::Pause,
4583+
engine->getReplicationThrottle().getStatus());
4584+
4585+
auto vb = engine->getVBucket(vbid);
4586+
4587+
// Buffer the commit of the prepare
4588+
EXPECT_EQ(cb::engine_errc::success,
4589+
snapshot(*consumer, stream->getOpaque(), 3, 3));
4590+
EXPECT_EQ(cb::engine_errc::success,
4591+
commit(*consumer, stream->getOpaque(), durableKey, 2, 3));
4592+
EXPECT_EQ(2, vb->getHighSeqno());
4593+
4594+
std::function<void()> hook = [this]() {
4595+
// Change the vbucket state using the hook. This will happen whilst the
4596+
// consumer buffering "thread" has a stream pointer.
4597+
setVBucketState(vbid,
4598+
vbucket_state_active,
4599+
nlohmann::json::parse(R"({"topology":[["active"]]})"),
4600+
TransferVB::No);
4601+
};
4602+
stream->setProcessBufferedMessages_postFront_Hook(hook);
4603+
4604+
// And process the buffered commit. This commit should be rejected. Prior
4605+
// to the fix it would be processed against the active.
4606+
stats.replicationThrottleThreshold = 99;
4607+
engine->setMaxDataSize(size);
4608+
ASSERT_EQ(ReplicationThrottle::Status::Process,
4609+
engine->getReplicationThrottle().getStatus());
4610+
EXPECT_EQ(more_to_process, consumer->processBufferedItems());
4611+
EXPECT_EQ(all_processed, consumer->processBufferedItems());
4612+
4613+
// Before fixing high-seqno would be at 3 (commit)
4614+
EXPECT_EQ(2, vb->getHighSeqno());
4615+
4616+
setVBucketState(
4617+
vbid,
4618+
vbucket_state_active,
4619+
nlohmann::json::parse(
4620+
R"({"topology":[["active"],["active","replica"]]})"));
4621+
4622+
// In MB-59518 it's not clear exactly what sequence of ACKS permit the
4623+
// tracked write to resolve (in the next step), but this is enough for a
4624+
// reproduction of the crash.
4625+
vb->seqnoAcknowledged(folly::SharedMutex::ReadHolder(vb->getStateLock()),
4626+
"replica",
4627+
2 /*prepareSeqno*/);
4628+
4629+
// And fail. With MB-59518 an error is logged about a failure to find the
4630+
// prepare, this function would throw because the commit is not successful
4631+
try {
4632+
vb->processResolvedSyncWrites();
4633+
} catch (const std::exception& e) {
4634+
FAIL() << "processResolvedSyncWrites exception:" << e.what();
4635+
}
4636+
EXPECT_EQ(3, vb->getHighSeqno());
4637+
flushVBucketToDiskIfPersistent(vbid);
4638+
}
4639+
45544640
void DurabilityPromotionStreamTest::SetUp() {
45554641
// Set up as a replica
45564642
DurabilityPassiveStreamTest::SetUp();

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,6 +1065,68 @@ cb::engine_errc STParameterizedBucketTest::addItem(Item& itm,
10651065
return rc;
10661066
}
10671067

1068+
cb::engine_errc STParameterizedBucketTest::snapshot(DcpConsumer& consumer,
1069+
uint32_t opaque,
1070+
uint64_t start,
1071+
uint64_t end) {
1072+
return consumer.snapshotMarker(opaque,
1073+
vbid,
1074+
start,
1075+
end,
1076+
MARKER_FLAG_MEMORY | MARKER_FLAG_CHK,
1077+
0,
1078+
0);
1079+
}
1080+
1081+
cb::engine_errc STParameterizedBucketTest::mutation(DcpConsumer& consumer,
1082+
uint32_t opaque,
1083+
const DocKey& key,
1084+
uint64_t seqno) {
1085+
return consumer.mutation(opaque,
1086+
key,
1087+
cb::const_byte_buffer(),
1088+
/*priv_bytes*/ 0,
1089+
PROTOCOL_BINARY_DATATYPE_JSON,
1090+
/*cas*/ 1,
1091+
vbid,
1092+
/*flags*/ 0,
1093+
/*bySeqno*/ seqno,
1094+
/*revSeqno*/ 0,
1095+
/*expTime*/ 0,
1096+
/*lock_time*/ 0,
1097+
/*meta*/ cb::const_byte_buffer(),
1098+
/*nru*/ 0);
1099+
}
1100+
1101+
cb::engine_errc STParameterizedBucketTest::prepare(DcpConsumer& consumer,
1102+
uint32_t opaque,
1103+
const DocKey& key,
1104+
uint64_t seqno) {
1105+
return consumer.prepare(opaque,
1106+
key,
1107+
cb::const_byte_buffer(),
1108+
/*priv_bytes*/ 0,
1109+
PROTOCOL_BINARY_DATATYPE_JSON,
1110+
/*cas*/ 1,
1111+
vbid,
1112+
/*flags*/ 0,
1113+
/*bySeqno*/ seqno,
1114+
/*revSeqno*/ 0,
1115+
/*expTime*/ 0,
1116+
/*lock_time*/ 0,
1117+
0,
1118+
DocumentState::Alive,
1119+
cb::durability::Level::Majority);
1120+
}
1121+
1122+
cb::engine_errc STParameterizedBucketTest::commit(DcpConsumer& consumer,
1123+
uint32_t opaque,
1124+
const DocKey& key,
1125+
uint64_t prepareSeqno,
1126+
uint64_t seqno) {
1127+
return consumer.commit(opaque, vbid, key, prepareSeqno, seqno);
1128+
}
1129+
10681130
/*
10691131
* MB-31175
10701132
* The following test checks to see that when we call handleSlowStream in an

engines/ep/tests/module_tests/evp_store_single_threaded_test.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,26 @@ class STParameterizedBucketTest
827827
return expected;
828828
}
829829

830+
// A set of basic helper functions to reduce the size of tests which are
831+
// driving a DCPConsumer
832+
cb::engine_errc snapshot(DcpConsumer& consumer,
833+
uint32_t opaque,
834+
uint64_t start,
835+
uint64_t end);
836+
cb::engine_errc mutation(DcpConsumer& consumer,
837+
uint32_t opaque,
838+
const DocKey& key,
839+
uint64_t seqno);
840+
cb::engine_errc prepare(DcpConsumer& consumer,
841+
uint32_t opaque,
842+
const DocKey& key,
843+
uint64_t seqno);
844+
cb::engine_errc commit(DcpConsumer& consumer,
845+
uint32_t opaque,
846+
const DocKey& key,
847+
uint64_t prepareSeqno,
848+
uint64_t seqno);
849+
830850
protected:
831851
void SetUp() override;
832852

0 commit comments

Comments
 (0)