Skip to content

Commit 270c444

Browse files
committed
MB-59518: Stop PassiveStream from processing ops against active VB
Add VBucket state checks to processSystemEvent and update the setWithMeta and deleteWithMeta calls to ensure only a replica or pending vbucket can be operated on by the PassiveStream. Change-Id: I783b5e2fc646dc9c191c1e341541c04bbbafafac Reviewed-on: https://review.couchbase.org/c/kv_engine/+/200507 Tested-by: Jim Walker <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Paolo Cocchi <[email protected]>
1 parent b82526a commit 270c444

File tree

5 files changed

+141
-7
lines changed

5 files changed

+141
-7
lines changed

engines/ep/src/dcp/passive_stream.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -667,9 +667,7 @@ cb::engine_errc PassiveStream::processMessage(MutationConsumerMessage* message,
667667
0,
668668
nullptr,
669669
consumer->getCookie(),
670-
{vbucket_state_active,
671-
vbucket_state_replica,
672-
vbucket_state_pending},
670+
permittedVBStates,
673671
CheckConflicts::No,
674672
true,
675673
GenerateBySeqno::No,
@@ -703,9 +701,7 @@ cb::engine_errc PassiveStream::processMessage(MutationConsumerMessage* message,
703701
nullptr,
704702
message->getVBucket(),
705703
consumer->getCookie(),
706-
{vbucket_state_active,
707-
vbucket_state_replica,
708-
vbucket_state_pending},
704+
permittedVBStates,
709705
CheckConflicts::No,
710706
meta,
711707
GenerateBySeqno::No,
@@ -847,7 +843,6 @@ cb::engine_errc PassiveStream::processAbort(
847843
if (!vb) {
848844
return cb::engine_errc::not_my_vbucket;
849845
}
850-
851846
folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
852847
if (!permittedVBStates.test(vb->getState())) {
853848
return cb::engine_errc::not_my_vbucket;
@@ -878,6 +873,10 @@ cb::engine_errc PassiveStream::processSystemEvent(
878873
if (!vb) {
879874
return cb::engine_errc::not_my_vbucket;
880875
}
876+
folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
877+
if (!permittedVBStates.test(vb->getState())) {
878+
return cb::engine_errc::not_my_vbucket;
879+
}
881880

882881
cb::engine_errc rv = cb::engine_errc::success;
883882

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6816,6 +6816,87 @@ TEST_P(CDCPassiveStreamTest, TouchedByExpelCheckpointNotReused) {
68166816
0));
68176817
}
68186818

6819+
// MB-59518 is an issue where a DCP passive-stream processed a commit against
6820+
// an active vbucket resulting in a duplicate commit (and failure). The issue
6821+
// occurred because it was possible to reach the VBucket::commit function
6822+
// during a replica->active state change provided that operations are buffered.
6823+
// The fix was to put strong VBucket state checks in the PassiveStream commit
6824+
// path.
6825+
void SingleThreadedPassiveStreamTest::replicaToActiveBufferedRejected(
6826+
DcpResponse::Event event) {
6827+
consumer->disableFlatBuffersSystemEvents();
6828+
6829+
// Force consumer buffering.
6830+
auto& stats = engine->getEpStats();
6831+
stats.replicationThrottleThreshold = 0;
6832+
const size_t size = stats.getMaxDataSize();
6833+
engine->setMaxDataSize(1);
6834+
ASSERT_EQ(ReplicationThrottle::Status::Pause,
6835+
engine->getReplicationThrottle().getStatus());
6836+
6837+
auto vb = engine->getVBucket(vbid);
6838+
6839+
// Buffer the requested operation
6840+
snapshot(*consumer, stream->getOpaque(), 1, 1);
6841+
6842+
if (event == DcpResponse::Event::Mutation) {
6843+
EXPECT_EQ(cb::engine_errc::success,
6844+
mutation(*consumer,
6845+
stream->getOpaque(),
6846+
makeStoredDocKey("key"),
6847+
1));
6848+
} else if (event == DcpResponse::Event::Deletion) {
6849+
EXPECT_EQ(cb::engine_errc::success,
6850+
deletion(*consumer,
6851+
stream->getOpaque(),
6852+
makeStoredDocKey("key"),
6853+
1));
6854+
} else if (event == DcpResponse::Event::SystemEvent) {
6855+
EXPECT_EQ(cb::engine_errc::success,
6856+
createCollection(*consumer, stream->getOpaque(), 1));
6857+
} else {
6858+
FAIL() << "Unexpected event";
6859+
}
6860+
6861+
// Was buffered and not applied.
6862+
EXPECT_EQ(0, vb->getHighSeqno());
6863+
6864+
std::function<void()> hook = [this]() {
6865+
// Change the vbucket state using the hook. This will happen whilst the
6866+
// consumer buffering "thread" has a stream pointer. Important or not
6867+
// is that ns_server uses "null" in the chain when the failover occurs
6868+
setVBucketState(
6869+
vbid,
6870+
vbucket_state_active,
6871+
nlohmann::json::parse(R"({"topology":[["active",null]]})"),
6872+
TransferVB::No);
6873+
};
6874+
stream->setProcessBufferedMessages_postFront_Hook(hook);
6875+
6876+
// And process the buffered commit. This commit should be rejected!
6877+
stats.replicationThrottleThreshold = 99;
6878+
engine->setMaxDataSize(size);
6879+
ASSERT_EQ(ReplicationThrottle::Status::Process,
6880+
engine->getReplicationThrottle().getStatus());
6881+
EXPECT_EQ(more_to_process, consumer->processBufferedItems());
6882+
EXPECT_EQ(all_processed, consumer->processBufferedItems());
6883+
6884+
// Expect no change, operation was rejected
6885+
EXPECT_EQ(0, vb->getHighSeqno());
6886+
}
6887+
6888+
TEST_P(SingleThreadedPassiveStreamTest, ReplicaToActiveBufferedMutation) {
6889+
replicaToActiveBufferedRejected(DcpResponse::Event::Mutation);
6890+
}
6891+
6892+
TEST_P(SingleThreadedPassiveStreamTest, ReplicaToActiveBufferedDeletion) {
6893+
replicaToActiveBufferedRejected(DcpResponse::Event::Deletion);
6894+
}
6895+
6896+
TEST_P(SingleThreadedPassiveStreamTest, ReplicaToActiveBufferedSystemEvent) {
6897+
replicaToActiveBufferedRejected(DcpResponse::Event::SystemEvent);
6898+
}
6899+
68196900
INSTANTIATE_TEST_SUITE_P(Persistent,
68206901
CDCPassiveStreamTest,
68216902
STParameterizedBucketTest::magmaConfigValues(),

engines/ep/tests/module_tests/dcp_stream_test.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#pragma once
1313

14+
#include "dcp/response.h"
1415
#include "dcp_test.h"
1516
#include "hash_table.h"
1617
#include "test_manifest.h"
@@ -182,6 +183,9 @@ class SingleThreadedPassiveStreamTest
182183
const std::optional<cb::durability::Requirements>& durReqs,
183184
bool compressed = false);
184185

186+
// coverage for MB-59518
187+
void replicaToActiveBufferedRejected(DcpResponse::Event event);
188+
185189
protected:
186190
// Should the DcpConsumer have SyncReplication enabled when created in
187191
// SetUp()?

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "checkpoint_manager.h"
2828
#include "checkpoint_remover.h"
2929
#include "checkpoint_utils.h"
30+
#include "collections/events_generated.h"
3031
#include "collections/vbucket_manifest_handles.h"
3132
#include "dcp/active_stream_checkpoint_processor_task.h"
3233
#include "dcp/backfill-manager.h"
@@ -1093,6 +1094,22 @@ cb::engine_errc STParameterizedBucketTest::mutation(DcpConsumer& consumer,
10931094
/*nru*/ 0);
10941095
}
10951096

1097+
cb::engine_errc STParameterizedBucketTest::deletion(DcpConsumer& consumer,
1098+
uint32_t opaque,
1099+
const DocKey& key,
1100+
uint64_t seqno) {
1101+
return consumer.deletion(opaque,
1102+
key,
1103+
cb::const_byte_buffer(),
1104+
/*priv_bytes*/ 0,
1105+
PROTOCOL_BINARY_DATATYPE_JSON,
1106+
/*cas*/ 1,
1107+
vbid,
1108+
/*bySeqno*/ seqno,
1109+
/*revSeqno*/ 0,
1110+
/*meta*/ cb::const_byte_buffer());
1111+
}
1112+
10961113
cb::engine_errc STParameterizedBucketTest::prepare(DcpConsumer& consumer,
10971114
uint32_t opaque,
10981115
const DocKey& key,
@@ -1130,6 +1147,32 @@ cb::engine_errc STParameterizedBucketTest::abort(DcpConsumer& consumer,
11301147
return consumer.abort(opaque, vbid, key, prepareSeqno, seqno);
11311148
}
11321149

1150+
cb::engine_errc STParameterizedBucketTest::createCollection(
1151+
DcpConsumer& consumer, uint32_t opaque, uint64_t seqno) {
1152+
// Create just the fruit collection @ seqno using flatbuffer message
1153+
flatbuffers::FlatBufferBuilder fb;
1154+
const auto collection = CollectionEntry::fruit;
1155+
auto fbPayload = Collections::VB::CreateCollection(
1156+
fb,
1157+
1,
1158+
uint32_t(ScopeEntry::defaultS.getId()),
1159+
uint32_t(collection.getId()),
1160+
false, /*max-ttl*/
1161+
0,
1162+
fb.CreateString(collection.name.data(), collection.name.size()),
1163+
false /*history*/);
1164+
fb.Finish(fbPayload);
1165+
return consumer.systemEvent(
1166+
opaque,
1167+
vbid,
1168+
mcbp::systemevent::id::CreateCollection,
1169+
seqno,
1170+
mcbp::systemevent::version::version2,
1171+
{reinterpret_cast<const uint8_t*>(collection.name.data()),
1172+
collection.name.size()},
1173+
{fb.GetBufferPointer(), fb.GetSize()});
1174+
}
1175+
11331176
/*
11341177
* MB-31175
11351178
* The following test checks to see that when we call handleSlowStream in an

engines/ep/tests/module_tests/evp_store_single_threaded_test.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,10 @@ class STParameterizedBucketTest
839839
uint32_t opaque,
840840
const DocKey& key,
841841
uint64_t seqno);
842+
cb::engine_errc deletion(DcpConsumer& consumer,
843+
uint32_t opaque,
844+
const DocKey& key,
845+
uint64_t seqno);
842846
cb::engine_errc prepare(DcpConsumer& consumer,
843847
uint32_t opaque,
844848
const DocKey& key,
@@ -853,6 +857,9 @@ class STParameterizedBucketTest
853857
const DocKey& key,
854858
uint64_t prepareSeqno,
855859
uint64_t seqno);
860+
cb::engine_errc createCollection(DcpConsumer& consumer,
861+
uint32_t opaque,
862+
uint64_t seqno);
856863

857864
protected:
858865
void SetUp() override;

0 commit comments

Comments
 (0)