Skip to content

Commit 11490ba

Browse files
committed
MB-29483: Merge remote-tracking branch 'review/watson_ep' into 5.1.0
* review/watson_ep: MB-29483: Disable DCP cursor dropping MB-29287: Add dcp_reflection_test MB-29287: Move SynchronousEPEngine building to seperate method MB-29287: Add string / streaming methods for DCP responses Change-Id: I57cf5c902ac3eee7f4cf2eb25741fe9022679039
2 parents 0154632 + 238af6b commit 11490ba

File tree

13 files changed

+296
-34
lines changed

13 files changed

+296
-34
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ ADD_EXECUTABLE(ep-engine_ep_unit_tests
257257
tests/module_tests/collections/vbucket_manifest_entry_test.cc
258258
tests/module_tests/configuration_test.cc
259259
tests/module_tests/defragmenter_test.cc
260+
tests/module_tests/dcp_reflection_test.cc
260261
tests/module_tests/dcp_test.cc
261262
tests/module_tests/ep_unit_tests_main.cc
262263
tests/module_tests/ephemeral_bucket_test.cc

engines/ep/src/dcp/consumer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
152152
pendingSetPriority = true;
153153
pendingEnableExtMetaData = true;
154154
pendingEnableValueCompression = config.isDcpValueCompressionEnabled();
155-
pendingSupportCursorDropping = true;
155+
// MB-29369: Don't request cursor dropping.
156+
pendingSupportCursorDropping = false;
156157

157158
ExTask task = std::make_shared<DcpConsumerTask>(&engine, this, 1);
158159
processorTaskId = ExecutorPool::get()->schedule(task);

engines/ep/src/dcp/producer.cc

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,10 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine& e,
192192
enableExtMetaData = false;
193193
enableValueCompression = false;
194194

195-
// Cursor dropping is disabled for replication connections by default,
196-
// but will be enabled through a control message to support backward
197-
// compatibility. For all other type of DCP connections, cursor dropping
198-
// will be enabled by default.
199-
if (name.find("replication") < name.length()) {
200-
supportsCursorDropping = false;
201-
} else {
202-
supportsCursorDropping = true;
203-
}
195+
// MB-29369: Cursor dropping is currently disabled for all
196+
// connections due to race condition which can result in skipping
197+
// mutations.
198+
supportsCursorDropping = false;
204199

205200
backfillMgr.reset(new BackfillManager(engine_));
206201

@@ -724,11 +719,8 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
724719
}
725720
return ENGINE_SUCCESS;
726721
} else if (strncmp(param, "supports_cursor_dropping", nkey) == 0) {
727-
if (valueStr == "true") {
728-
supportsCursorDropping = true;
729-
} else {
730-
supportsCursorDropping = false;
731-
}
722+
// MB-29369: Cursor dropping currently disabled. Ignore requests
723+
// to enable.
732724
return ENGINE_SUCCESS;
733725
} else if (strncmp(param, "set_noop_interval", nkey) == 0) {
734726
uint32_t noopInterval;

engines/ep/src/dcp/response.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,9 @@ uint32_t MutationResponse::getMessageSize() const {
9595

9696
return header + body;
9797
}
98+
99+
std::ostream& operator<<(std::ostream& os, const DcpResponse& r) {
100+
os << "DcpResponse[" << &r << "] with"
101+
<< " event:" << r.to_string();
102+
return os;
103+
}

engines/ep/src/dcp/response.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class DcpResponse {
7676
return opaque_;
7777
}
7878

79-
Event getEvent() {
79+
Event getEvent() const {
8080
return event_;
8181
}
8282

@@ -142,6 +142,8 @@ class DcpResponse {
142142
Event event_;
143143
};
144144

145+
std::ostream& operator<<(std::ostream& os, const DcpResponse& r);
146+
145147
class StreamRequest : public DcpResponse {
146148
public:
147149
StreamRequest(uint16_t vbucket, uint32_t opaque, uint32_t flags,

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2614,6 +2614,9 @@ static test_result test_dcp_agg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
26142614

26152615
static test_result test_dcp_cursor_dropping(ENGINE_HANDLE *h,
26162616
ENGINE_HANDLE_V1 *h1) {
2617+
// MB-29369: Cursor dropping currently disabled
2618+
return SKIPPED;
2619+
26172620
/* Initially write a few items */
26182621
int num_items = 25;
26192622
const int initialSnapshotSize = num_items;
@@ -2711,6 +2714,9 @@ static test_result test_dcp_cursor_dropping(ENGINE_HANDLE *h,
27112714

27122715
static test_result test_dcp_cursor_dropping_backfill(ENGINE_HANDLE *h,
27132716
ENGINE_HANDLE_V1 *h1) {
2717+
// MB-29369: Cursor dropping currently disabled
2718+
return SKIPPED;
2719+
27142720
/* Initially write a few items */
27152721
int num_items = 50;
27162722
const int initialSnapshotSize = num_items;
@@ -3033,10 +3039,13 @@ static uint32_t add_stream_for_consumer(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
30333039
cb_assert(dcp_last_opaque != opaque);
30343040
}
30353041

3042+
// MB-29369: Cursor dropping currently disabled.
3043+
#if 0
30363044
dcp_step(h, h1, cookie);
30373045
cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
30383046
cb_assert(dcp_last_key.compare("supports_cursor_dropping") == 0);
30393047
cb_assert(dcp_last_opaque != opaque);
3048+
#endif
30403049

30413050
checkeq(ENGINE_SUCCESS,
30423051
h1->dcp.add_stream(h, cookie, opaque, vbucket, flags),

engines/ep/tests/mock/mock_synchronous_ep_engine.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,24 @@ void SynchronousEPEngine::setDcpConnMap(
6666
dcpConnMap_ = std::move(dcpConnMap);
6767
}
6868

69+
std::unique_ptr<SynchronousEPEngine> SynchronousEPEngine::build(
70+
const std::string& config) {
71+
std::unique_ptr<SynchronousEPEngine> engine(
72+
new SynchronousEPEngine(config));
73+
74+
// switch current thread to this new engine, so all sub-created objects
75+
// are accounted in it's mem_used.
76+
ObjectRegistry::onSwitchThread(engine.get());
77+
78+
engine->setKVBucket(engine->public_makeBucket(engine->getConfiguration()));
79+
80+
// Ensure that EPEngine is told about necessary server callbacks
81+
// (client disconnect, bucket delete).
82+
engine->public_initializeEngineCallbacks();
83+
84+
return engine;
85+
}
86+
6987
void SynchronousEPEngine::initializeConnmap() {
7088
dcpConnMap_->initialize();
7189
}

engines/ep/tests/mock/mock_synchronous_ep_engine.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class SynchronousEPEngine : public EventuallyPersistentEngine {
3737
void setKVBucket(std::unique_ptr<KVBucket> store);
3838
void setDcpConnMap(std::unique_ptr<DcpConnMap> dcpConnMap);
3939

40+
/// Constructs a SynchronousEPEngine instance, along with the necessary
41+
/// sub-components.
42+
static std::unique_ptr<SynchronousEPEngine> build(
43+
const std::string& config);
44+
4045
/* Allow us to call normally protected methods */
4146

4247
ENGINE_ERROR_CODE public_doDcpVbTakeoverStats(const void* cookie,
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/*
19+
* Unit tests for DCP which connecting a DCP Producer to a DCP Consumer.
20+
*/
21+
22+
#include <memcached/protocol_binary.h>
23+
#include <programs/engine_testapp/mock_server.h>
24+
#include <tests/mock/mock_dcp_consumer.h>
25+
#include <tests/mock/mock_dcp_producer.h>
26+
#include <tests/mock/mock_stream.h>
27+
28+
#include "evp_store_single_threaded_test.h"
29+
#include "test_helpers.h"
30+
31+
/**
32+
* Test fixture which creates two ep-engine (bucket) instances, using one
33+
* as a source for DCP replication and the second as the destination.
34+
*/
35+
class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
36+
protected:
37+
void SetUp() override {
38+
SingleThreadedKVBucketTest::SetUp();
39+
40+
// In addition to the initial engine which is created; we also need
41+
// to create a second bucket instance for the destination (replica)
42+
// vBucket.
43+
std::string config = config_string;
44+
if (config.size() > 0) {
45+
config += ";";
46+
}
47+
config += "dbname=" + std::string(test_dbname) + "-replica";
48+
replicaEngine = SynchronousEPEngine::build(config);
49+
50+
setupProducerAndConsumerStreams();
51+
}
52+
53+
void setupProducerAndConsumerStreams() {
54+
// Setup the source (active) and destination (replica) Buckets.
55+
EXPECT_EQ(ENGINE_SUCCESS,
56+
engine->getKVBucket()->setVBucketState(
57+
vbid,
58+
vbucket_state_active, /*transfer*/
59+
false));
60+
EXPECT_EQ(ENGINE_SUCCESS,
61+
replicaEngine->getKVBucket()->setVBucketState(
62+
vbid,
63+
vbucket_state_replica, /*transfer*/
64+
false));
65+
66+
auto& sourceVb = *engine->getVBucket(vbid);
67+
68+
// Add some items to replicate / takeover to the source Bucket.
69+
store_item(vbid, makeStoredDocKey("key1"), "value");
70+
store_item(vbid, makeStoredDocKey("key2"), "value");
71+
store_item(vbid, makeStoredDocKey("key3"), "value");
72+
73+
// Setup the consumer.
74+
consumer =
75+
new MockDcpConsumer(*replicaEngine, nullptr, "test_consumer");
76+
EXPECT_EQ(ENGINE_SUCCESS,
77+
consumer->addStream(
78+
/*opaque*/ 0, vbid, DCP_ADD_STREAM_FLAG_TAKEOVER));
79+
consumerStream = consumer->getVbucketStream(vbid).get();
80+
81+
// Need to discard the first message from the consumerStream (the
82+
// StreamRequest), as we'll manually set that up in the producer.
83+
{
84+
std::unique_ptr<DcpResponse> streamRequest(consumerStream->next());
85+
EXPECT_NE(nullptr, streamRequest);
86+
EXPECT_EQ(DcpResponse::Event::StreamReq, streamRequest->getEvent());
87+
}
88+
89+
// Create the Dcp producer.
90+
producer = new MockDcpProducer(*engine,
91+
cookie,
92+
"test_producer",
93+
/*notifyOnly*/ false,
94+
{});
95+
mockProducer = dynamic_cast<MockDcpProducer*>(producer.get());
96+
97+
mockProducer->mockActiveStreamRequest(
98+
consumerStream->getFlags(),
99+
consumerStream->getOpaque(),
100+
sourceVb,
101+
consumerStream->getStartSeqno(),
102+
consumerStream->getEndSeqno(),
103+
consumerStream->getVBucketUUID(),
104+
consumerStream->getSnapStartSeqno(),
105+
consumerStream->getSnapEndSeqno());
106+
producerStream = dynamic_cast<MockActiveStream*>(
107+
mockProducer->findStream(vbid).get());
108+
109+
// // Both streams created. Check state is as expected.
110+
ASSERT_TRUE(producerStream->isTakeoverSend())
111+
<< "Producer stream state should have transitioned to "
112+
"TakeoverSend";
113+
ASSERT_EQ(2, sourceVb.checkpointManager.getNumOfCursors())
114+
<< "Should have both persistence and DCP producer cursor on "
115+
"source "
116+
"VB";
117+
118+
// Creating a producer will schedule one
119+
// ActiveStreamCheckpointProcessorTask
120+
// that task though sleeps forever, so won't run until woken.
121+
ASSERT_EQ(1, getLpAuxQ()->getFutureQueueSize());
122+
}
123+
124+
void TearDown() override {
125+
producer->cancelCheckpointCreatorTask();
126+
producer->closeAllStreams();
127+
producer.reset();
128+
129+
consumer->closeAllStreams();
130+
shutdownAndPurgeTasks(replicaEngine.get());
131+
destroy_mock_cookie(cookie);
132+
cookie = nullptr;
133+
replicaEngine.reset();
134+
SingleThreadedKVBucketTest::TearDown();
135+
}
136+
137+
TaskQueue* getLpAuxQ() const {
138+
auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>(
139+
ExecutorPool::get());
140+
return task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
141+
}
142+
143+
std::unique_ptr<DcpResponse> getNextProducerMsg(MockActiveStream* stream) {
144+
std::unique_ptr<DcpResponse> producerMsg(stream->next());
145+
if (!producerMsg) {
146+
EXPECT_EQ(1, getLpAuxQ()->getFutureQueueSize())
147+
<< "Expected to have ActiveStreamCheckpointProcessorTask "
148+
"in future queue after null producerMsg";
149+
stream->nextCheckpointItemTask();
150+
EXPECT_GT(stream->getItemsRemaining(), 0)
151+
<< "Expected some items ready after calling "
152+
"nextCheckpointItemTask()";
153+
return getNextProducerMsg(stream);
154+
}
155+
return producerMsg;
156+
}
157+
158+
void readNextConsumerMsgAndSendToProducer(ActiveStream& producerStream,
159+
PassiveStream& consumerStream);
160+
161+
std::unique_ptr<SynchronousEPEngine> replicaEngine;
162+
MockDcpConsumer* consumer;
163+
// Non-owning ptr to consumer stream (owned by consumer).
164+
PassiveStream* consumerStream;
165+
166+
dcp_producer_t producer;
167+
// Non-owning copy of {producer}.
168+
MockDcpProducer* mockProducer;
169+
170+
// Non-owning ptr to producer stream (owned by producer).
171+
MockActiveStream* producerStream;
172+
};
173+
174+
void DCPLoopbackStreamTest::readNextConsumerMsgAndSendToProducer(
175+
ActiveStream& producerStream, PassiveStream& consumerStream) {
176+
std::unique_ptr<DcpResponse> consumerMsg(consumerStream.next());
177+
178+
// Pass the consumer's message to the producer.
179+
if (consumerMsg) {
180+
switch (consumerMsg->getEvent()) {
181+
case DcpResponse::Event::SnapshotMarker:
182+
producerStream.snapshotMarkerAckReceived();
183+
break;
184+
case DcpResponse::Event::SetVbucket:
185+
producerStream.setVBucketStateAckRecieved();
186+
break;
187+
default:
188+
FAIL();
189+
}
190+
}
191+
}
192+
193+
/**
194+
* Test the behavour of a Takeover stream between a DcpProducer and DcpConsumer.
195+
*
196+
* Creates a Producer and Consumer; along with a single Active -> Passive
197+
* stream, then makes a streamRequest (simulating what ns_server normally does).
198+
* Then loops; reading messages from the producer and passing them to the
199+
* consumer, and reading responses from the consumer and passing to the
200+
* producer. Test finishes when the PassiveStream is set to Dead - at that point
201+
* the vBucket should be active on the destination; and dead on the source.
202+
*/
203+
TEST_F(DCPLoopbackStreamTest, Takeover) {
204+
while (true) {
205+
auto producerMsg = getNextProducerMsg(producerStream);
206+
207+
// Pass the message onto the consumer.
208+
EXPECT_EQ(ENGINE_SUCCESS,
209+
consumerStream->messageReceived(std::move(producerMsg)));
210+
211+
// Get the next message from the consumer; and pass to the producer.
212+
readNextConsumerMsgAndSendToProducer(*producerStream, *consumerStream);
213+
214+
// Check consumer stream state - drop reflecting messages when
215+
// stream goes dead.
216+
if (!consumerStream->isActive()) {
217+
break;
218+
}
219+
}
220+
221+
auto* sourceVb = engine->getVBucket(vbid).get();
222+
EXPECT_EQ(vbucket_state_dead, sourceVb->getState())
223+
<< "Expected producer vBucket to be dead once stream "
224+
"transitions to dead.";
225+
226+
auto* destVb = replicaEngine->getVBucket(vbid).get();
227+
EXPECT_EQ(vbucket_state_active, destVb->getState())
228+
<< "Expected consumer vBucket to be active once stream "
229+
"transitions to dead.";
230+
}

0 commit comments

Comments
 (0)