Skip to content

Commit dfeff61

Browse files
committed
MB-29287: Add dcp_reflection_test
Add new set of DCP unit tests, which connect DcpProducer and DcpConsumer objects; and reflect the messages between them. This allows us to test how the producer and consumer communicate; without involving ns_server. The tests are purely constrained to ep-engine - i.e. no packets are actually transmitted over the network; we instead just inject the messages directly into the DcpProducer / DcpConsumer objects. Change-Id: I641826238dc09c9b94d6540b8e4a5edc656883dc Reviewed-on: http://review.couchbase.org/92984 Well-Formed: Build Bot <[email protected]> Reviewed-by: Tim Bradgate <[email protected]> Reviewed-by: Jim Walker <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent fab7de9 commit dfeff61

File tree

4 files changed

+235
-0
lines changed

4 files changed

+235
-0
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ ADD_EXECUTABLE(ep-engine_ep_unit_tests
189189
tests/module_tests/checkpoint_test.cc
190190
tests/module_tests/defragmenter_test.cc
191191
tests/module_tests/ep_unit_tests_main.cc
192+
tests/module_tests/dcp_reflection_test.cc
192193
tests/module_tests/dcp_test.cc
193194
tests/module_tests/evp_engine_test.cc
194195
tests/module_tests/evp_store_rollback_test.cc

src/dcp/stream.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ bool Stream::isInMemory() const {
7070
return state_.load() == STREAM_IN_MEMORY;
7171
}
7272

73+
bool Stream::isInTakeoverSend() const {
74+
return state_.load() == STREAM_TAKEOVER_SEND;
75+
}
76+
7377
void Stream::clear_UNLOCKED() {
7478
while (!readyQ.empty()) {
7579
DcpResponse* resp = readyQ.front();

src/dcp/stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ class Stream : public RCValue {
139139
/// @Returns true if state_ is InMemory
140140
bool isInMemory() const;
141141

142+
bool isInTakeoverSend() const;
143+
142144
void clear() {
143145
LockHolder lh(streamMutex);
144146
clear_UNLOCKED();
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/*
19+
* Unit tests for DCP which connecting a DCP Producer to a DCP Consumer.
20+
*/
21+
22+
#include <memcached/protocol_binary.h>
23+
#include <programs/engine_testapp/mock_server.h>
24+
#include <tests/mock/mock_dcp_consumer.h>
25+
#include <tests/mock/mock_dcp_producer.h>
26+
#include <tests/mock/mock_stream.h>
27+
28+
#include "evp_store_single_threaded_test.h"
29+
30+
/**
31+
* Test fixture which creates two ep-engine (bucket) instances, using one
32+
* as a source for DCP replication and the second as the destination.
33+
*/
34+
class DCPLoopbackStreamTest : public SingleThreadedEPStoreTest {
35+
protected:
36+
void SetUp() override {
37+
SingleThreadedEPStoreTest::SetUp();
38+
39+
// In addition to the initial engine which is created; we also need
40+
// to create a second bucket instance for the destination (replica)
41+
// vBucket.
42+
std::string config = config_string;
43+
if (config.size() > 0) {
44+
config += ";";
45+
}
46+
config += "dbname=" + std::string(test_dbname) + "-replica";
47+
replicaEngine = SynchronousEPEngine::build(config);
48+
49+
setupProducerAndConsumerStreams();
50+
}
51+
52+
void setupProducerAndConsumerStreams() {
53+
// Setup the source (active) and destination (replica) Buckets.
54+
EXPECT_EQ(ENGINE_SUCCESS,
55+
engine->getEpStore()->setVBucketState(
56+
vbid,
57+
vbucket_state_active, /*transfer*/
58+
false));
59+
EXPECT_EQ(ENGINE_SUCCESS,
60+
replicaEngine->getEpStore()->setVBucketState(
61+
vbid,
62+
vbucket_state_replica, /*transfer*/
63+
false));
64+
65+
auto* sourceVb = engine->getVBucket(vbid).get();
66+
67+
// Add some items to replicate / takeover to the source Bucket.
68+
store_item(vbid,
69+
"key1", "value");
70+
store_item(vbid,
71+
"key2", "value");
72+
store_item(vbid,
73+
"key3", "value");
74+
75+
// Setup the consumer.
76+
consumer =
77+
new MockDcpConsumer(*replicaEngine, nullptr, "test_consumer");
78+
EXPECT_EQ(ENGINE_SUCCESS,
79+
consumer->addStream(
80+
/*opaque*/ 0, vbid, DCP_ADD_STREAM_FLAG_TAKEOVER));
81+
consumerStream = consumer->getVbucketStream(vbid).get();
82+
83+
// Need to discard the first message from the consumerStream (the
84+
// StreamRequest), as we'll manually set that up in the producer.
85+
{
86+
std::unique_ptr<DcpResponse> streamRequest(consumerStream->next());
87+
EXPECT_NE(nullptr, streamRequest);
88+
EXPECT_EQ(DCP_STREAM_REQ, streamRequest->getEvent());
89+
}
90+
91+
// Create the Dcp producer.
92+
producer = new MockDcpProducer(*engine,
93+
cookie,
94+
"test_producer",
95+
/*notifyOnly*/ false);
96+
mockProducer = dynamic_cast<MockDcpProducer*>(producer.get());
97+
98+
mockProducer->mockActiveStreamRequest(
99+
consumerStream->getFlags(),
100+
consumerStream->getOpaque(),
101+
vbid,
102+
consumerStream->getStartSeqno(),
103+
consumerStream->getEndSeqno(),
104+
consumerStream->getVBucketUUID(),
105+
consumerStream->getSnapStartSeqno(),
106+
consumerStream->getSnapEndSeqno());
107+
producerStream = dynamic_cast<MockActiveStream*>(
108+
mockProducer->findStream(vbid).get());
109+
110+
// // Both streams created. Check state is as expected.
111+
ASSERT_TRUE(producerStream->isInTakeoverSend())
112+
<< "Producer stream state should have transitioned to TakeoverSend";
113+
ASSERT_EQ(2, sourceVb->checkpointManager.getNumOfCursors())
114+
<< "Should have both persistence and DCP producer cursor on source "
115+
"VB";
116+
117+
// Creating a producer will schedule one ActiveStreamCheckpointProcessorTask
118+
// that task though sleeps forever, so won't run until woken.
119+
ASSERT_EQ(1, getLpAuxQ()->getFutureQueueSize());
120+
}
121+
122+
void TearDown() override {
123+
producer->cancelCheckpointCreatorTask();
124+
producer->closeAllStreams();
125+
producer.reset();
126+
127+
consumer->closeAllStreams();
128+
shutdownAndPurgeTasks(replicaEngine.get());
129+
destroy_mock_cookie(cookie);
130+
cookie = nullptr;
131+
replicaEngine.reset();
132+
SingleThreadedEPStoreTest::TearDown();
133+
}
134+
135+
TaskQueue *getLpAuxQ() const {
136+
auto *task_executor =
137+
reinterpret_cast<SingleThreadedExecutorPool *>(ExecutorPool::get());
138+
return task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
139+
}
140+
141+
std::unique_ptr<DcpResponse> getNextProducerMsg(MockActiveStream* stream) {
142+
std::unique_ptr<DcpResponse> producerMsg(stream->next());
143+
if (!producerMsg) {
144+
EXPECT_EQ(1, getLpAuxQ()->getFutureQueueSize())
145+
<< "Expected to have ActiveStreamCheckpointProcessorTask "
146+
"in future queue after null producerMsg";
147+
stream->nextCheckpointItemTask();
148+
EXPECT_GT(stream->getItemsRemaining(), 0)
149+
<< "Expected some items ready after calling nextCheckpointItemTask()";
150+
return getNextProducerMsg(stream);
151+
}
152+
return producerMsg;
153+
}
154+
155+
void readNextConsumerMsgAndSendToProducer(ActiveStream& producerStream,
156+
PassiveStream& consumerStream);
157+
158+
std::unique_ptr<SynchronousEPEngine> replicaEngine;
159+
MockDcpConsumer* consumer;
160+
// Non-owning ptr to consumer stream (owned by consumer).
161+
PassiveStream* consumerStream;
162+
163+
dcp_producer_t producer;
164+
// Non-owning copy of {producer}.
165+
MockDcpProducer* mockProducer;
166+
167+
// Non-owning ptr to producer stream (owned by producer).
168+
MockActiveStream* producerStream;
169+
};
170+
171+
void DCPLoopbackStreamTest::readNextConsumerMsgAndSendToProducer(
172+
ActiveStream& producerStream,
173+
PassiveStream& consumerStream) {
174+
std::unique_ptr<DcpResponse> consumerMsg(consumerStream.next());
175+
176+
// Pass the consumer's message to the producer.
177+
if (consumerMsg) {
178+
switch (consumerMsg->getEvent()) {
179+
case DCP_SNAPSHOT_MARKER:
180+
producerStream.snapshotMarkerAckReceived();
181+
break;
182+
case DCP_SET_VBUCKET:
183+
producerStream.setVBucketStateAckRecieved();
184+
break;
185+
default:
186+
FAIL();
187+
}
188+
}
189+
}
190+
191+
/**
192+
* Test the behavour of a Takeover stream between a DcpProducer and DcpConsumer.
193+
*
194+
* Creates a Producer and Consumer; along with a single Active -> Passive
195+
* stream, then makes a streamRequest (simulating what ns_server normally does).
196+
* Then loops; reading messages from the producer and passing them to the
197+
* consumer, and reading responses from the consumer and passing to the
198+
* producer. Test finishes when the PassiveStream is set to Dead - at that point
199+
* the vBucket should be active on the destination; and dead on the source.
200+
*/
201+
TEST_F(DCPLoopbackStreamTest, Takeover) {
202+
while (true) {
203+
auto producerMsg = getNextProducerMsg(producerStream);
204+
205+
// Pass the message onto the consumer.
206+
EXPECT_EQ(ENGINE_SUCCESS,
207+
consumerStream->messageReceived(producerMsg.release()));
208+
209+
// Get the next message from the consumer; and pass to the producer.
210+
readNextConsumerMsgAndSendToProducer(*producerStream, *consumerStream);
211+
212+
// Check consumer stream state - drop reflecting messages when
213+
// stream goes dead.
214+
if (consumerStream->getState() == STREAM_DEAD) {
215+
break;
216+
}
217+
}
218+
219+
auto* sourceVb = engine->getVBucket(vbid).get();
220+
EXPECT_EQ(vbucket_state_dead, sourceVb->getState())
221+
<< "Expected producer vBucket to be dead once stream "
222+
"transitions to dead.";
223+
224+
auto* destVb = replicaEngine->getVBucket(vbid).get();
225+
EXPECT_EQ(vbucket_state_active, destVb->getState())
226+
<< "Expected consumer vBucket to be active once stream "
227+
"transitions to dead.";
228+
}

0 commit comments

Comments
 (0)