2020 */
2121
2222#include < memcached/protocol_binary.h>
23+ #include < platform/dirutils.h>
2324#include < programs/engine_testapp/mock_server.h>
2425#include < tests/mock/mock_checkpoint_manager.h>
2526#include < tests/mock/mock_dcp_consumer.h>
@@ -40,6 +41,22 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
4041protected:
4142 void SetUp () override {
4243 SingleThreadedKVBucketTest::SetUp ();
44+
45+ // Paranoia - remove any previous replica disk files.
46+ try {
47+ cb::io::rmrf (std::string (test_dbname) + " -replica" );
48+ } catch (std::system_error& e) {
49+ if (e.code () != std::error_code (ENOENT, std::system_category ())) {
50+ throw e;
51+ }
52+ }
53+
54+ ASSERT_EQ (ENGINE_SUCCESS,
55+ engine->getKVBucket ()->setVBucketState (
56+ vbid,
57+ vbucket_state_active,
58+ {{" topology" ,
59+ nlohmann::json::array ({{" active" , " replica" }})}}));
4360 }
4461
4562 ENGINE_ERROR_CODE getInternalHelper (const DocKey& key) {
@@ -51,38 +68,15 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
5168 .getStatus ();
5269 }
5370
54- void setupProducer (EnableExpiryOutput enableExpiryOutput,
55- uint32_t exp_time = 0 ) {
56- // Setup the source (active) Bucket.
57- EXPECT_EQ (ENGINE_SUCCESS,
58- engine->getKVBucket ()->setVBucketState (vbid,
59- vbucket_state_active));
60-
61- // Add some items to the source Bucket.
62- auto key1 = makeStoredDocKey (" key1" );
63- auto key2 = makeStoredDocKey (" key2" );
64- auto key3 = makeStoredDocKey (" key3" );
65- store_item (vbid, key1, " value" , exp_time);
66- store_item (vbid, key2, " value" , exp_time);
67- store_item (vbid, key3, " value" , exp_time);
68-
71+ void setupProducer (EnableExpiryOutput enableExpiryOutput) {
6972 // Create the Dcp producer.
7073 producer = SingleThreadedKVBucketTest::createDcpProducer (
7174 cookie,
7275 IncludeDeleteTime::No);
7376 producer->scheduleCheckpointProcessorTask ();
74-
75- // Setup conditions for expirations
76- auto expectedGetOutcome = ENGINE_SUCCESS;
7777 if (enableExpiryOutput == EnableExpiryOutput::Yes) {
7878 producer->setDCPExpiry (true );
79- expectedGetOutcome = ENGINE_KEY_ENOENT;
8079 }
81- TimeTraveller t (1080 );
82- // Trigger expiries on a get, or just check that the key exists
83- EXPECT_EQ (expectedGetOutcome, getInternalHelper (key1));
84- EXPECT_EQ (expectedGetOutcome, getInternalHelper (key2));
85- EXPECT_EQ (expectedGetOutcome, getInternalHelper (key3));
8680
8781 auto & sourceVb = *engine->getVBucket (vbid);
8882 producer->mockActiveStreamRequest (consumerStream->getFlags (),
@@ -96,12 +90,12 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
9690 producerStream = dynamic_cast <MockActiveStream*>(
9791 producer->findStream (vbid).get ());
9892
99- ASSERT_EQ (2 , static_cast <MockCheckpointManager*>(
100- sourceVb.checkpointManager .get ())
101- ->getNumOfCursors ())
93+ ASSERT_EQ (2 ,
94+ static_cast <MockCheckpointManager*>(
95+ sourceVb.checkpointManager .get ())
96+ ->getNumOfCursors ())
10297 << " Should have both persistence and DCP producer cursor on "
103- " source "
104- " VB" ;
98+ " source VB" ;
10599
106100 // Creating a producer will schedule one
107101 // ActiveStreamCheckpointProcessorTask
@@ -174,6 +168,17 @@ class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
174168 " nextCheckpointItemTask()" ;
175169 return getNextProducerMsg (stream);
176170 }
171+
172+ // Cannot pass mutation/deletion directly to the consumer as the object
173+ // is different
174+ if (producerMsg->getEvent () == DcpResponse::Event::Mutation ||
175+ producerMsg->getEvent () == DcpResponse::Event::Deletion ||
176+ producerMsg->getEvent () == DcpResponse::Event::Expiration ||
177+ producerMsg->getEvent () == DcpResponse::Event::Prepare) {
178+ producerMsg = std::make_unique<MutationConsumerMessage>(
179+ *static_cast <MutationResponse*>(producerMsg.get ()));
180+ }
181+
177182 return producerMsg;
178183 }
179184
@@ -230,13 +235,33 @@ void DCPLoopbackStreamTest::takeoverTest(
230235 exp_time = time (NULL ) + 256 ;
231236 }
232237
238+ // Add some items to the source Bucket.
239+ std::vector<StoredDocKey> keys;
240+ keys.push_back (makeStoredDocKey (" key1" ));
241+ keys.push_back (makeStoredDocKey (" key2" ));
242+ keys.push_back (makeStoredDocKey (" key3" ));
243+ for (const auto & key : keys) {
244+ store_item (vbid, key, " value" , exp_time);
245+ }
246+
247+ // Setup conditions for expirations
248+ auto expectedGetOutcome = ENGINE_SUCCESS;
249+ if (enableExpiryOutput == EnableExpiryOutput::Yes) {
250+ expectedGetOutcome = ENGINE_KEY_ENOENT;
251+ }
252+ TimeTraveller t (1080 );
253+ // Trigger expiries on a get, or just check that the key exists
254+ for (const auto & key : keys) {
255+ EXPECT_EQ (expectedGetOutcome, getInternalHelper (key));
256+ }
257+
233258 // Note: the order matters.
234259 // First, we setup the Consumer with the given flags and we discard the
235260 // StreamRequest message from the Consumer::readyQ.
236261 // Then, we simulate the Producer receiving the StreamRequest just
237262 // by creating the Producer with the Consumer's flags
238263 setupConsumer (DCP_ADD_STREAM_FLAG_TAKEOVER);
239- setupProducer (enableExpiryOutput, exp_time );
264+ setupProducer (enableExpiryOutput);
240265
241266 // Both streams created. Check state is as expected.
242267 ASSERT_TRUE (producerStream->isTakeoverSend ())
@@ -247,15 +272,6 @@ void DCPLoopbackStreamTest::takeoverTest(
247272 auto producerMsg = getNextProducerMsg (producerStream);
248273 ASSERT_TRUE (producerMsg);
249274
250- // Cannot pass mutation/deletion directly to the consumer as the object
251- // is different
252- if (producerMsg->getEvent () == DcpResponse::Event::Mutation ||
253- producerMsg->getEvent () == DcpResponse::Event::Deletion ||
254- producerMsg->getEvent () == DcpResponse::Event::Expiration) {
255- producerMsg = std::make_unique<MutationConsumerMessage>(
256- *static_cast <MutationResponse*>(producerMsg.get ()));
257- }
258-
259275 // Pass the message onto the consumer.
260276 EXPECT_EQ (ENGINE_SUCCESS,
261277 consumerStream->messageReceived (std::move (producerMsg)));
0 commit comments