Skip to content

Commit ca0dcff

Browse files
committed
MB-59518: Add test coverage for a buffered prepare
Prepare already had correct state checks as KVBucket::prepare locks the vbucket and checks for replica or pending. A unit test is added to cover the case of a buffered prepare Change-Id: Ib7cae573f0f0ea051a1ea747c76f9bc5068b8696 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/200591 Reviewed-by: Paolo Cocchi <[email protected]> Tested-by: Jim Walker <[email protected]> Well-Formed: Restriction Checker
1 parent 270c444 commit ca0dcff

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4659,6 +4659,62 @@ TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedAbort) {
46594659
replicaToActiveBufferedResolution(false);
46604660
}
46614661

4662+
TEST_P(DurabilityPassiveStreamPersistentTest, ReplicaToActiveBufferedPrepare) {
4663+
// Begin by pushing a mutation. Only to get away from seqno:0
4664+
auto key = makeStoredDocKey("key");
4665+
EXPECT_EQ(cb::engine_errc::success,
4666+
snapshot(*consumer, stream->getOpaque(), 1, 1));
4667+
EXPECT_EQ(cb::engine_errc::success,
4668+
mutation(*consumer, stream->getOpaque(), key, 1));
4669+
flushVBucketToDiskIfPersistent(vbid);
4670+
4671+
EXPECT_EQ(cb::engine_errc::success,
4672+
snapshot(*consumer, stream->getOpaque(), 2, 2));
4673+
4674+
// Now begin buffering.
4675+
auto& stats = engine->getEpStats();
4676+
stats.replicationThrottleThreshold = 0;
4677+
const size_t size = stats.getMaxDataSize();
4678+
engine->setMaxDataSize(1);
4679+
ASSERT_EQ(ReplicationThrottle::Status::Pause,
4680+
engine->getReplicationThrottle().getStatus());
4681+
4682+
auto vb = engine->getVBucket(vbid);
4683+
4684+
// Buffer the prepare.
4685+
// Note to detect a bug when processing a buffered snapshot marker, if we
4686+
// set the type as DISK it would hit an exception when calling
4687+
// setDuplicatePrepareWindow.
4688+
// Now push a prepare onto the replica.
4689+
auto durableKey = makeStoredDocKey("durable");
4690+
4691+
EXPECT_EQ(cb::engine_errc::success,
4692+
prepare(*consumer, stream->getOpaque(), durableKey, 2));
4693+
EXPECT_EQ(1, vb->getHighSeqno());
4694+
4695+
std::function<void()> hook = [this]() {
4696+
// Change the vbucket state using the hook. This will happen whilst the
4697+
// consumer buffering "thread" has a stream pointer.
4698+
setVBucketState(vbid,
4699+
vbucket_state_active,
4700+
nlohmann::json::parse(R"({"topology":[["active"]]})"),
4701+
TransferVB::No);
4702+
};
4703+
stream->setProcessBufferedMessages_postFront_Hook(hook);
4704+
4705+
// And process the buffered resolution. This operation should be rejected.
4706+
// Prior to the fix it would be processed against the active.
4707+
stats.replicationThrottleThreshold = 99;
4708+
engine->setMaxDataSize(size);
4709+
ASSERT_EQ(ReplicationThrottle::Status::Process,
4710+
engine->getReplicationThrottle().getStatus());
4711+
EXPECT_EQ(more_to_process, consumer->processBufferedItems());
4712+
EXPECT_EQ(all_processed, consumer->processBufferedItems());
4713+
4714+
// Prepare must be rejected, no change to high-seq
4715+
EXPECT_EQ(1, vb->getHighSeqno());
4716+
}
4717+
46624718
void DurabilityPromotionStreamTest::SetUp() {
46634719
// Set up as a replica
46644720
DurabilityPassiveStreamTest::SetUp();

0 commit comments

Comments
 (0)