Skip to content

Commit 238af6b

Browse files
committed
Merge remote-tracking branch 'ep-engine/watson' into kv_engine/watson_ep
* ep-engine/watson: MB-29483: Disable DCP cursor dropping MB-29287: Add dcp_reflection_test MB-29287: Move SynchronousEPEngine building to seperate method MB-29287: Add string / streaming methods for DCP responses Change-Id: Ic4d36779ef5281cdc4cf2a3bc4275e291a5ad2c5
2 parents d3ea9f4 + 5686159 commit 238af6b

File tree

13 files changed

+330
-35
lines changed

13 files changed

+330
-35
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ ADD_EXECUTABLE(ep-engine_ep_unit_tests
189189
tests/module_tests/checkpoint_test.cc
190190
tests/module_tests/defragmenter_test.cc
191191
tests/module_tests/ep_unit_tests_main.cc
192+
tests/module_tests/dcp_reflection_test.cc
192193
tests/module_tests/dcp_test.cc
193194
tests/module_tests/evp_engine_test.cc
194195
tests/module_tests/evp_store_rollback_test.cc

src/dcp/consumer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine &engine, const void *cookie,
123123
pendingSetPriority = true;
124124
pendingEnableExtMetaData = true;
125125
pendingEnableValueCompression = config.isDcpValueCompressionEnabled();
126-
pendingSupportCursorDropping = true;
126+
// MB-29369: Don't request cursor dropping.
127+
pendingSupportCursorDropping = false;
127128

128129
ExTask task = new Processor(&engine, this, 1);
129130
processorTaskId = ExecutorPool::get()->schedule(task, NONIO_TASK_IDX);

src/dcp/producer.cc

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,10 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
172172
enableExtMetaData = false;
173173
enableValueCompression = false;
174174

175-
// Cursor dropping is disabled for replication connections by default,
176-
// but will be enabled through a control message to support backward
177-
// compatibility. For all other type of DCP connections, cursor dropping
178-
// will be enabled by default.
179-
if (name.find("replication") < name.length()) {
180-
supportsCursorDropping = false;
181-
} else {
182-
supportsCursorDropping = true;
183-
}
175+
// MB-29369: Cursor dropping is currently disabled for all
176+
// connections due to race condition which can result in skipping
177+
// mutations.
178+
supportsCursorDropping = false;
184179

185180
backfillMgr.reset(new BackfillManager(&engine_));
186181

@@ -584,11 +579,8 @@ ENGINE_ERROR_CODE DcpProducer::control(uint32_t opaque, const void* key,
584579
}
585580
return ENGINE_SUCCESS;
586581
} else if (strncmp(param, "supports_cursor_dropping", nkey) == 0) {
587-
if (valueStr == "true") {
588-
supportsCursorDropping = true;
589-
} else {
590-
supportsCursorDropping = false;
591-
}
582+
// MB-29369: Cursor dropping currently disabled. Ignore requests
583+
// to enable.
592584
return ENGINE_SUCCESS;
593585
} else if (strncmp(param, "set_noop_interval", nkey) == 0) {
594586
if (parseUint32(valueStr.c_str(), &noopCtx.noopInterval)) {

src/dcp/response.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,34 @@ const uint32_t SetVBucketState::baseMsgBytes = 25;
3535
const uint32_t SnapshotMarker::baseMsgBytes = 44;
3636
const uint32_t MutationResponse::mutationBaseMsgBytes = 55;
3737
const uint32_t MutationResponse::deletionBaseMsgBytes = 42;
38+
39+
40+
std::string to_string(dcp_event_t event) {
41+
switch (event) {
42+
case DCP_MUTATION:
43+
return "DCP_MUTATION";
44+
case DCP_DELETION:
45+
return "DCP_DELETION";
46+
case DCP_EXPIRATION:
47+
return "DCP_EXPIRATION";
48+
case DCP_FLUSH:
49+
return "DCP_FLUSH";
50+
case DCP_SET_VBUCKET:
51+
return "DCP_SET_VBUCKET";
52+
case DCP_STREAM_REQ:
53+
return "DCP_STREAM_REQ";
54+
case DCP_STREAM_END:
55+
return "DCP_STREAM_END";
56+
case DCP_SNAPSHOT_MARKER:
57+
return "DCP_SNAPSHOT_MARKER";
58+
case DCP_ADD_STREAM:
59+
return "DCP_ADD_STREAM";
60+
}
61+
return "<invalid_dcp_event_t>(" + std::to_string(int(event)) + ")";
62+
}
63+
64+
std::ostream& operator<<(std::ostream& os, const DcpResponse& r) {
65+
os << "DcpResponse[" << &r << "] with"
66+
<< " event:" << to_string(r.getEvent());
67+
return os;
68+
}

src/dcp/response.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ enum dcp_event_t {
3535
DCP_ADD_STREAM
3636
};
3737

38+
std::string to_string(dcp_event_t event);
3839

3940
enum dcp_marker_flag_t {
4041
MARKER_FLAG_MEMORY = 0x01,
@@ -59,7 +60,7 @@ class DcpResponse {
5960
return opaque_;
6061
}
6162

62-
dcp_event_t getEvent() {
63+
dcp_event_t getEvent() const {
6364
return event_;
6465
}
6566

@@ -95,6 +96,8 @@ class DcpResponse {
9596
dcp_event_t event_;
9697
};
9798

99+
std::ostream& operator<<(std::ostream& os, const DcpResponse& r);
100+
98101
class StreamRequest : public DcpResponse {
99102
public:
100103
StreamRequest(uint16_t vbucket, uint32_t opaque, uint32_t flags,

src/dcp/stream.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ bool Stream::isInMemory() const {
7070
return state_.load() == STREAM_IN_MEMORY;
7171
}
7272

73+
bool Stream::isInTakeoverSend() const {
74+
return state_.load() == STREAM_TAKEOVER_SEND;
75+
}
76+
7377
void Stream::clear_UNLOCKED() {
7478
while (!readyQ.empty()) {
7579
DcpResponse* resp = readyQ.front();

src/dcp/stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ class Stream : public RCValue {
139139
/// @Returns true if state_ is InMemory
140140
bool isInMemory() const;
141141

142+
bool isInTakeoverSend() const;
143+
142144
void clear() {
143145
LockHolder lh(streamMutex);
144146
clear_UNLOCKED();

src/ep_engine.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
10251025
int& keyOffset);
10261026

10271027
SERVER_HANDLE_V1 *serverApi;
1028+
// epstore is an owning pointer.
10281029
EventuallyPersistentStore *epstore;
10291030
WorkLoadPolicy *workload;
10301031
bucket_priority_t workloadPriority;

tests/ep_testsuite_dcp.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2321,6 +2321,9 @@ static test_result test_dcp_agg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
23212321

23222322
static test_result test_dcp_cursor_dropping(ENGINE_HANDLE *h,
23232323
ENGINE_HANDLE_V1 *h1) {
2324+
// MB-29369: Cursor dropping currently disabled
2325+
return SKIPPED;
2326+
23242327
/* Initially write a few items */
23252328
int num_items = 25;
23262329
const int cursor_dropping_mem_thres_perc = 90;
@@ -2407,6 +2410,9 @@ static test_result test_dcp_cursor_dropping(ENGINE_HANDLE *h,
24072410

24082411
static test_result test_dcp_cursor_dropping_backfill(ENGINE_HANDLE *h,
24092412
ENGINE_HANDLE_V1 *h1) {
2413+
// MB-29369: Cursor dropping currently disabled
2414+
return SKIPPED;
2415+
24102416
/* Initially write a few items */
24112417
int num_items = 50;
24122418
const int cursor_dropping_mem_thres_perc = 90;
@@ -2907,10 +2913,13 @@ static uint32_t add_stream_for_consumer(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
29072913
cb_assert(dcp_last_opaque != opaque);
29082914
}
29092915

2916+
// MB-29369: Cursor dropping currently disabled.
2917+
#if 0
29102918
dcp_step(h, h1, cookie);
29112919
cb_assert(dcp_last_op == PROTOCOL_BINARY_CMD_DCP_CONTROL);
29122920
cb_assert(dcp_last_key.compare("supports_cursor_dropping") == 0);
29132921
cb_assert(dcp_last_opaque != opaque);
2922+
#endif
29142923

29152924
checkeq(ENGINE_SUCCESS,
29162925
h1->dcp.add_stream(h, cookie, opaque, vbucket, flags),

0 commit comments

Comments
 (0)