Skip to content

Commit 8f2290e

Browse files
committed
MB-59518: Stop PassiveStream from processing abort against active VB
Add VBucket state checks to processAbort so that the abort is not processed against an active vbucket. Change-Id: I757e24e55ada716932bd49b984ec5d5b5f75c444 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/200506 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Jim Walker <[email protected]> Well-Formed: Restriction Checker
1 parent 0e6003c commit 8f2290e

File tree

5 files changed

+43
-7
lines changed

5 files changed

+43
-7
lines changed

engines/ep/src/dcp/passive_stream.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,11 @@ cb::engine_errc PassiveStream::processAbort(
848848
return cb::engine_errc::not_my_vbucket;
849849
}
850850

851+
folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
852+
if (!permittedVBStates.test(vb->getState())) {
853+
return cb::engine_errc::not_my_vbucket;
854+
}
855+
851856
auto rv = vb->abort(abort.getKey(),
852857
abort.getPreparedSeqno(),
853858
abort.getAbortSeqno(),

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4557,7 +4557,8 @@ TEST_P(DurabilityPassiveStreamPersistentTest, BufferDcpAbort) {
45574557
// during a replica->active state change provided that operations are buffered.
45584558
// The fix was to put strong VBucket state checks in the PassiveStream commit
45594559
// path.
4560-
TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
4560+
void DurabilityPassiveStreamPersistentTest::replicaToActiveBufferedResolution(
4561+
bool resolutionIsCommit) {
45614562
// Begin by pushing a mutation. Only to get away from seqno:0
45624563
auto key = makeStoredDocKey("key");
45634564
EXPECT_EQ(cb::engine_errc::success,
@@ -4584,11 +4585,16 @@ TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
45844585

45854586
auto vb = engine->getVBucket(vbid);
45864587

4587-
// Buffer the commit of the prepare
4588+
// Buffer the commit/abort of the prepare
45884589
EXPECT_EQ(cb::engine_errc::success,
45894590
snapshot(*consumer, stream->getOpaque(), 3, 3));
4590-
EXPECT_EQ(cb::engine_errc::success,
4591-
commit(*consumer, stream->getOpaque(), durableKey, 2, 3));
4591+
if (resolutionIsCommit) {
4592+
EXPECT_EQ(cb::engine_errc::success,
4593+
commit(*consumer, stream->getOpaque(), durableKey, 2, 3));
4594+
} else {
4595+
EXPECT_EQ(cb::engine_errc::success,
4596+
abort(*consumer, stream->getOpaque(), durableKey, 2, 3));
4597+
}
45924598
EXPECT_EQ(2, vb->getHighSeqno());
45934599

45944600
std::function<void()> hook = [this]() {
@@ -4601,16 +4607,16 @@ TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
46014607
};
46024608
stream->setProcessBufferedMessages_postFront_Hook(hook);
46034609

4604-
// And process the buffered commit. This commit should be rejected. Prior
4605-
// to the fix it would be processed against the active.
4610+
// And process the buffered resolution. This operation should be rejected.
4611+
// Prior to the fix it would be processed against the active.
46064612
stats.replicationThrottleThreshold = 99;
46074613
engine->setMaxDataSize(size);
46084614
ASSERT_EQ(ReplicationThrottle::Status::Process,
46094615
engine->getReplicationThrottle().getStatus());
46104616
EXPECT_EQ(more_to_process, consumer->processBufferedItems());
46114617
EXPECT_EQ(all_processed, consumer->processBufferedItems());
46124618

4613-
// Before fixing high-seqno would be at 3 (commit)
4619+
// Before fixing high-seqno would be at 3 (commit/abort)
46144620
EXPECT_EQ(2, vb->getHighSeqno());
46154621

46164622
setVBucketState(
@@ -4628,6 +4634,7 @@ TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
46284634

46294635
// And fail. With MB-59518 an error is logged about a failure to find the
46304636
// prepare, this function would throw because the commit is not successful
4637+
// Note the abort case would fail here in the same way as commit
46314638
try {
46324639
vb->processResolvedSyncWrites();
46334640
} catch (const std::exception& e) {
@@ -4637,6 +4644,14 @@ TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
46374644
flushVBucketToDiskIfPersistent(vbid);
46384645
}
46394646

4647+
TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedCommit) {
4648+
replicaToActiveBufferedResolution(true);
4649+
}
4650+
4651+
TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedAbort) {
4652+
replicaToActiveBufferedResolution(false);
4653+
}
4654+
46404655
void DurabilityPromotionStreamTest::SetUp() {
46414656
// Set up as a replica
46424657
DurabilityPassiveStreamTest::SetUp();

engines/ep/tests/module_tests/dcp_durability_stream_test.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,9 @@ class DurabilityPassiveStreamPersistentTest
275275
* Read the highCompletedSeqno from disk.
276276
*/
277277
uint64_t getPersistedHCS();
278+
279+
/// Test coverage for MB-59518
280+
void replicaToActiveBufferedResolution(bool resolutionIsCommit);
278281
};
279282

280283
class DurabilityPassiveStreamEphemeralTest

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,14 @@ cb::engine_errc STParameterizedBucketTest::commit(DcpConsumer& consumer,
11271127
return consumer.commit(opaque, vbid, key, prepareSeqno, seqno);
11281128
}
11291129

1130+
cb::engine_errc STParameterizedBucketTest::abort(DcpConsumer& consumer,
1131+
uint32_t opaque,
1132+
const DocKey& key,
1133+
uint64_t prepareSeqno,
1134+
uint64_t seqno) {
1135+
return consumer.abort(opaque, vbid, key, prepareSeqno, seqno);
1136+
}
1137+
11301138
/*
11311139
* MB-31175
11321140
* The following test checks to see that when we call handleSlowStream in an

engines/ep/tests/module_tests/evp_store_single_threaded_test.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,11 @@ class STParameterizedBucketTest
846846
const DocKey& key,
847847
uint64_t prepareSeqno,
848848
uint64_t seqno);
849+
cb::engine_errc abort(DcpConsumer& consumer,
850+
uint32_t opaque,
851+
const DocKey& key,
852+
uint64_t prepareSeqno,
853+
uint64_t seqno);
849854

850855
protected:
851856
void SetUp() override;

0 commit comments

Comments
 (0)