Skip to content

Commit 2c0d03e

Browse files
committed
Merge branch 'spock' into vulcan
* spock: MB-29483: Disable DCP cursor dropping MB-29287: Add dcp_reflection_test MB-29287: Move SynchronousEPEngine building to seperate method MB-29287: Add string / streaming methods for DCP responses Change-Id: I042e476069651df511dc9c0273ff053296dd2b1b
2 parents 8066527 + b8939ea commit 2c0d03e

File tree

13 files changed

+298
-36
lines changed

13 files changed

+298
-36
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
286286
tests/module_tests/collections/vbucket_manifest_entry_test.cc
287287
tests/module_tests/configuration_test.cc
288288
tests/module_tests/defragmenter_test.cc
289+
tests/module_tests/dcp_reflection_test.cc
289290
tests/module_tests/dcp_test.cc
290291
tests/module_tests/ep_unit_tests_main.cc
291292
tests/module_tests/ephemeral_bucket_test.cc

engines/ep/src/dcp/consumer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine& engine,
170170
pendingSendNoopInterval = config.isDcpEnableNoop();
171171
pendingSetPriority = true;
172172
pendingEnableExtMetaData = true;
173-
pendingSupportCursorDropping = true;
173+
// MB-29369: Don't request cursor dropping.
174+
pendingSupportCursorDropping = false;
174175
pendingSupportHifiMFU =
175176
(config.getHtEvictionPolicy() == "hifi_mfu");
176177
}

engines/ep/src/dcp/producer.cc

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -215,15 +215,10 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine& e,
215215
enableExtMetaData = false;
216216
forceValueCompression = false;
217217

218-
// Cursor dropping is disabled for replication connections by default,
219-
// but will be enabled through a control message to support backward
220-
// compatibility. For all other type of DCP connections, cursor dropping
221-
// will be enabled by default.
222-
if (name.find("replication") < name.length()) {
223-
supportsCursorDropping = false;
224-
} else {
225-
supportsCursorDropping = true;
226-
}
218+
// MB-29369: Cursor dropping is currently disabled for all
219+
// connections due to race condition which can result in skipping
220+
// mutations.
221+
supportsCursorDropping = false;
227222

228223
backfillMgr.reset(new BackfillManager(engine_));
229224
}
@@ -745,11 +740,8 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
745740
}
746741
return ENGINE_SUCCESS;
747742
} else if (strncmp(param, "supports_cursor_dropping", nkey) == 0) {
748-
if (valueStr == "true") {
749-
supportsCursorDropping = true;
750-
} else {
751-
supportsCursorDropping = false;
752-
}
743+
// MB-29369: Cursor dropping currently disabled. Ignore requests
744+
// to enable.
753745
return ENGINE_SUCCESS;
754746
} else if (strncmp(param, "supports_hifi_MFU", nkey) == 0) {
755747
supportsHifiMFU = (valueStr == "true");

engines/ep/src/dcp/response.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,9 @@ uint32_t MutationResponse::getMessageSize() const {
8383

8484
return header + body;
8585
}
86+
87+
std::ostream& operator<<(std::ostream& os, const DcpResponse& r) {
88+
os << "DcpResponse[" << &r << "] with"
89+
<< " event:" << r.to_string();
90+
return os;
91+
}

engines/ep/src/dcp/response.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class DcpResponse {
8282
return opaque_;
8383
}
8484

85-
Event getEvent() {
85+
Event getEvent() const {
8686
return event_;
8787
}
8888

@@ -148,6 +148,8 @@ class DcpResponse {
148148
Event event_;
149149
};
150150

151+
std::ostream& operator<<(std::ostream& os, const DcpResponse& r);
152+
151153
class StreamRequest : public DcpResponse {
152154
public:
153155
StreamRequest(uint16_t vbucket, uint32_t opaque, uint32_t flags,

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2686,6 +2686,9 @@ static test_result test_dcp_agg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
26862686

26872687
static test_result test_dcp_cursor_dropping(ENGINE_HANDLE *h,
26882688
ENGINE_HANDLE_V1 *h1) {
2689+
// MB-29369: Cursor dropping currently disabled
2690+
return SKIPPED;
2691+
26892692
/* Initially write a few items */
26902693
int num_items = 25;
26912694
const int initialSnapshotSize = num_items;
@@ -2783,6 +2786,9 @@ static test_result test_dcp_cursor_dropping(ENGINE_HANDLE *h,
27832786

27842787
static test_result test_dcp_cursor_dropping_backfill(ENGINE_HANDLE *h,
27852788
ENGINE_HANDLE_V1 *h1) {
2789+
// MB-29369: Cursor dropping currently disabled
2790+
return SKIPPED;
2791+
27862792
/* Initially write a few items */
27872793
int num_items = 50;
27882794
const int initialSnapshotSize = num_items;
@@ -2979,10 +2985,13 @@ static uint32_t add_stream_for_consumer(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
29792985
cb_assert(dcp_last_key.compare("enable_ext_metadata") == 0);
29802986
cb_assert(dcp_last_opaque != opaque);
29812987

2988+
// MB-29369: Cursor dropping currently disabled.
2989+
#if 0
29822990
dcp_step(h, h1, cookie);
29832991
cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
29842992
cb_assert(dcp_last_key.compare("supports_cursor_dropping") == 0);
29852993
cb_assert(dcp_last_opaque != opaque);
2994+
#endif
29862995

29872996
dcp_step(h, h1, cookie);
29882997
cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);

engines/ep/tests/mock/mock_synchronous_ep_engine.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,25 @@ void SynchronousEPEngine::setDcpConnMap(
6868
dcpConnMap_ = std::move(dcpConnMap);
6969
}
7070

71+
std::unique_ptr<SynchronousEPEngine> SynchronousEPEngine::build(
72+
const std::string& config) {
73+
std::unique_ptr<SynchronousEPEngine> engine(
74+
new SynchronousEPEngine(config));
75+
76+
// switch current thread to this new engine, so all sub-created objects
77+
// are accounted in it's mem_used.
78+
ObjectRegistry::onSwitchThread(engine.get());
79+
80+
engine->setKVBucket(
81+
engine->public_makeMockBucket(engine->getConfiguration()));
82+
83+
// Ensure that EPEngine is told about necessary server callbacks
84+
// (client disconnect, bucket delete).
85+
engine->public_initializeEngineCallbacks();
86+
87+
return engine;
88+
}
89+
7190
void SynchronousEPEngine::initializeConnmap() {
7291
dcpConnMap_->initialize();
7392
}

engines/ep/tests/mock/mock_synchronous_ep_engine.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ class SynchronousEPEngine : public EventuallyPersistentEngine {
4141
void setKVBucket(std::unique_ptr<KVBucket> store);
4242
void setDcpConnMap(std::unique_ptr<DcpConnMap> dcpConnMap);
4343

44+
/// Constructs a SynchronousEPEngine instance, along with the necessary
45+
/// sub-components.
46+
static std::unique_ptr<SynchronousEPEngine> build(
47+
const std::string& config);
48+
4449
/* Allow us to call normally protected methods */
4550

4651
ENGINE_ERROR_CODE public_doDcpVbTakeoverStats(const void* cookie,
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
/*
19+
* Unit tests for DCP which connecting a DCP Producer to a DCP Consumer.
20+
*/
21+
22+
#include <memcached/protocol_binary.h>
23+
#include <programs/engine_testapp/mock_server.h>
24+
#include <tests/mock/mock_dcp_consumer.h>
25+
#include <tests/mock/mock_dcp_producer.h>
26+
#include <tests/mock/mock_stream.h>
27+
28+
#include "checkpoint.h"
29+
#include "evp_store_single_threaded_test.h"
30+
#include "test_helpers.h"
31+
32+
/**
33+
* Test fixture which creates two ep-engine (bucket) instances, using one
34+
* as a source for DCP replication and the second as the destination.
35+
*/
36+
class DCPLoopbackStreamTest : public SingleThreadedKVBucketTest {
37+
protected:
38+
void SetUp() override {
39+
SingleThreadedKVBucketTest::SetUp();
40+
41+
// In addition to the initial engine which is created; we also need
42+
// to create a second bucket instance for the destination (replica)
43+
// vBucket.
44+
std::string config = config_string;
45+
if (config.size() > 0) {
46+
config += ";";
47+
}
48+
config += "dbname=" + std::string(test_dbname) + "-replica";
49+
replicaEngine = SynchronousEPEngine::build(config);
50+
51+
setupProducerAndConsumerStreams();
52+
}
53+
54+
void setupProducerAndConsumerStreams() {
55+
// Setup the source (active) and destination (replica) Buckets.
56+
EXPECT_EQ(ENGINE_SUCCESS,
57+
engine->getKVBucket()->setVBucketState(
58+
vbid,
59+
vbucket_state_active, /*transfer*/
60+
false));
61+
EXPECT_EQ(ENGINE_SUCCESS,
62+
replicaEngine->getKVBucket()->setVBucketState(
63+
vbid,
64+
vbucket_state_replica, /*transfer*/
65+
false));
66+
67+
auto& sourceVb = *engine->getVBucket(vbid);
68+
69+
// Add some items to replicate / takeover to the source Bucket.
70+
store_item(vbid, makeStoredDocKey("key1"), "value");
71+
store_item(vbid, makeStoredDocKey("key2"), "value");
72+
store_item(vbid, makeStoredDocKey("key3"), "value");
73+
74+
// Setup the consumer.
75+
consumer = std::make_shared<MockDcpConsumer>(
76+
*replicaEngine, cookie, "test_consumer");
77+
EXPECT_EQ(ENGINE_SUCCESS,
78+
consumer->addStream(
79+
/*opaque*/ 0, vbid, DCP_ADD_STREAM_FLAG_TAKEOVER));
80+
consumerStream = consumer->getVbucketStream(vbid).get();
81+
82+
// Need to discard the first message from the consumerStream (the
83+
// StreamRequest), as we'll manually set that up in the producer.
84+
{
85+
std::unique_ptr<DcpResponse> streamRequest(consumerStream->next());
86+
EXPECT_NE(nullptr, streamRequest);
87+
EXPECT_EQ(DcpResponse::Event::StreamReq, streamRequest->getEvent());
88+
}
89+
90+
// Create the Dcp producer.
91+
producer = SingleThreadedKVBucketTest::createDcpProducer(
92+
cookie,
93+
{},
94+
/*dcpCollectionAware*/ false,
95+
IncludeDeleteTime::No);
96+
producer->scheduleCheckpointProcessorTask();
97+
98+
producer->mockActiveStreamRequest(consumerStream->getFlags(),
99+
consumerStream->getOpaque(),
100+
sourceVb,
101+
consumerStream->getStartSeqno(),
102+
consumerStream->getEndSeqno(),
103+
consumerStream->getVBucketUUID(),
104+
consumerStream->getSnapStartSeqno(),
105+
consumerStream->getSnapEndSeqno());
106+
producerStream = dynamic_cast<MockActiveStream*>(
107+
producer->findStream(vbid).get());
108+
109+
// // Both streams created. Check state is as expected.
110+
ASSERT_TRUE(producerStream->isTakeoverSend())
111+
<< "Producer stream state should have transitioned to "
112+
"TakeoverSend";
113+
ASSERT_EQ(2, sourceVb.checkpointManager->getNumOfCursors())
114+
<< "Should have both persistence and DCP producer cursor on "
115+
"source "
116+
"VB";
117+
118+
// Creating a producer will schedule one
119+
// ActiveStreamCheckpointProcessorTask
120+
// that task though sleeps forever, so won't run until woken.
121+
ASSERT_EQ(1, getLpAuxQ()->getFutureQueueSize());
122+
}
123+
124+
void TearDown() override {
125+
producer->cancelCheckpointCreatorTask();
126+
producer->closeAllStreams();
127+
producer.reset();
128+
129+
consumer->closeAllStreams();
130+
consumer.reset();
131+
shutdownAndPurgeTasks(replicaEngine.get());
132+
destroy_mock_cookie(cookie);
133+
cookie = nullptr;
134+
replicaEngine.reset();
135+
SingleThreadedKVBucketTest::TearDown();
136+
}
137+
138+
TaskQueue* getLpAuxQ() const {
139+
auto* task_executor = reinterpret_cast<SingleThreadedExecutorPool*>(
140+
ExecutorPool::get());
141+
return task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
142+
}
143+
144+
std::unique_ptr<DcpResponse> getNextProducerMsg(MockActiveStream* stream) {
145+
std::unique_ptr<DcpResponse> producerMsg(stream->next());
146+
if (!producerMsg) {
147+
EXPECT_EQ(1, getLpAuxQ()->getFutureQueueSize())
148+
<< "Expected to have ActiveStreamCheckpointProcessorTask "
149+
"in future queue after null producerMsg";
150+
stream->nextCheckpointItemTask();
151+
EXPECT_GT(stream->getItemsRemaining(), 0)
152+
<< "Expected some items ready after calling "
153+
"nextCheckpointItemTask()";
154+
return getNextProducerMsg(stream);
155+
}
156+
return producerMsg;
157+
}
158+
159+
void readNextConsumerMsgAndSendToProducer(ActiveStream& producerStream,
160+
PassiveStream& consumerStream);
161+
162+
std::unique_ptr<SynchronousEPEngine> replicaEngine;
163+
std::shared_ptr<MockDcpConsumer> consumer;
164+
// Non-owning ptr to consumer stream (owned by consumer).
165+
PassiveStream* consumerStream;
166+
167+
std::shared_ptr<MockDcpProducer> producer;
168+
169+
// Non-owning ptr to producer stream (owned by producer).
170+
MockActiveStream* producerStream;
171+
};
172+
173+
void DCPLoopbackStreamTest::readNextConsumerMsgAndSendToProducer(
174+
ActiveStream& producerStream, PassiveStream& consumerStream) {
175+
std::unique_ptr<DcpResponse> consumerMsg(consumerStream.next());
176+
177+
// Pass the consumer's message to the producer.
178+
if (consumerMsg) {
179+
switch (consumerMsg->getEvent()) {
180+
case DcpResponse::Event::SnapshotMarker:
181+
producerStream.snapshotMarkerAckReceived();
182+
break;
183+
case DcpResponse::Event::SetVbucket:
184+
producerStream.setVBucketStateAckRecieved();
185+
break;
186+
default:
187+
FAIL();
188+
}
189+
}
190+
}
191+
192+
/**
193+
* Test the behavour of a Takeover stream between a DcpProducer and DcpConsumer.
194+
*
195+
* Creates a Producer and Consumer; along with a single Active -> Passive
196+
* stream, then makes a streamRequest (simulating what ns_server normally does).
197+
* Then loops; reading messages from the producer and passing them to the
198+
* consumer, and reading responses from the consumer and passing to the
199+
* producer. Test finishes when the PassiveStream is set to Dead - at that point
200+
* the vBucket should be active on the destination; and dead on the source.
201+
*/
202+
TEST_F(DCPLoopbackStreamTest, Takeover) {
203+
while (true) {
204+
auto producerMsg = getNextProducerMsg(producerStream);
205+
206+
// Pass the message onto the consumer.
207+
EXPECT_EQ(ENGINE_SUCCESS,
208+
consumerStream->messageReceived(std::move(producerMsg)));
209+
210+
// Get the next message from the consumer; and pass to the producer.
211+
readNextConsumerMsgAndSendToProducer(*producerStream, *consumerStream);
212+
213+
// Check consumer stream state - drop reflecting messages when
214+
// stream goes dead.
215+
if (!consumerStream->isActive()) {
216+
break;
217+
}
218+
}
219+
220+
auto* sourceVb = engine->getVBucket(vbid).get();
221+
EXPECT_EQ(vbucket_state_dead, sourceVb->getState())
222+
<< "Expected producer vBucket to be dead once stream "
223+
"transitions to dead.";
224+
225+
auto* destVb = replicaEngine->getVBucket(vbid).get();
226+
EXPECT_EQ(vbucket_state_active, destVb->getState())
227+
<< "Expected consumer vBucket to be active once stream "
228+
"transitions to dead.";
229+
}

0 commit comments

Comments
 (0)