Skip to content

Commit 702c0b0

Browse files
committed
MB-34951: Ignore StreamEnd if opaque doesn't match
The DCP option 'send_stream_end_on_client_close_stream' was added via MB-26363 to have the Producer send a STREAM_END message even when the Consumer initiated the close. This was requested by an external DCP client (Go SDK), however the ep-engine DcpConsumer _also_ requested this feature. However this can cause (benign) warnings during rebalance when the initial (backfill) Stream is closed and the takeover stream created. The problem is the following sequence of messages (all for the same vbid): DcpConsumer ns_server proxy DcpProducer <--- 1. CloseStream Req ------ ---- 2. CloseStream Resp ----> ---- 3. CloseStream Req ---> <--- 4. CloseStream Resp --- <--- 5. AddStream(Takeover) -- <--- 6. StreamEnd Request ------------------------------------- ---- 7. StreamEnd Response [KEY_ENOENT] ----------------------> ---- 8. StreamRequest ----------------------------------------> ns_server is not aware of `send_stream_end_on_client_close_stream`; so it doesn't wait for `6. StreamEnd Request` from the Producer before sending the AddStream request to the Consumer. As such, the Consumer has already removed the entry in it's streams map for the given vbucket, so when it receives the StreamEnd request from the Producer it returns KEY_ENOENT error as the (old) opaque doesn't match the (new) opaque for the takeover stream. send_stream_end_on_client_close_stream is useful for ep-engine to have an explicit window on when potential mismatched opaques could appear (after the SteamEnd request no more should be sent from the Producer) - and indeed it's presence is required to address MB-34850. As such we cannot simply disable send_stream_end_on_client_close_stream. The alternative is to simply ignore the StreamEnd if the given opaque no longer matches (i.e. because another stream has already been started). Change-Id: Ib6baf414aa5b4c306f9146818db2d25ac4326f4f Reviewed-on: http://review.couchbase.org/111746 Reviewed-by: Ben Huddleston <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 2e8f227 commit 702c0b0

File tree

2 files changed

+35
-7
lines changed

2 files changed

+35
-7
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,18 +370,26 @@ ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque,
370370
auto stream = findStream(vbucket);
371371
if (!stream) {
372372
logger->warn(
373-
"({}) End stream received but no such stream for this "
373+
"({}) End stream received with opaque:{} but no such stream "
374+
"for this "
374375
"vBucket",
375-
vbucket);
376+
vbucket,
377+
opaque);
376378
return ENGINE_KEY_ENOENT;
377379
}
378380

379381
if (stream->getOpaque() != opaque) {
380-
logger->warn("({}) End stream received with opaque {} but expected {}",
381-
vbucket,
382-
opaque,
383-
stream->getOpaque());
384-
return ENGINE_KEY_ENOENT;
382+
// MB-34951: By the time the DcpConsumer receives the StreamEnd from
383+
// the DcpProducer it is possible that ns_server has already started
384+
// a new Stream (with updated opaque) for this vbucket.
385+
// In which case just ignore this StreamEnd message, returning SUCCESS.
386+
logger->info(
387+
"({}) End stream received with opaque {} but current opaque "
388+
"for that vb is {} - ignoring",
389+
vbucket,
390+
opaque,
391+
stream->getOpaque());
392+
return ENGINE_SUCCESS;
385393
}
386394

387395
logger->info("({}) End stream received with reason {}", vbucket, flags);

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3924,6 +3924,26 @@ TEST_F(SingleThreadedEPBucketTest,
39243924
testAllStreamLevelMessages(ENGINE_KEY_ENOENT);
39253925
}
39263926

3927+
// MB-34951: Check that a consumer correctly handles (and ignores) a StreamEnd
3928+
// request from the producer if it has already created a new stream (for the
3929+
// same vb) with a different opaque.
3930+
TEST_F(SingleThreadedEPBucketTest,
3931+
MB_34951_ConsumerRecvStreamEndAfterAddStream) {
3932+
// Setup: Create replica VB and create stream for vbid, then close it
3933+
// and add another stream (same vbid).
3934+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
3935+
auto consumer = std::make_shared<MockDcpConsumer>(*engine, cookie, "conn");
3936+
const int opaque1 = 1;
3937+
ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque1, vbid, {}));
3938+
ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque1, vbid));
3939+
const int opaque2 = 2;
3940+
ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque2, vbid, {}));
3941+
3942+
// Test: Have the producer send a StreamEnd with the "old" opaque.
3943+
EXPECT_EQ(ENGINE_SUCCESS,
3944+
consumer->streamEnd(opaque1, vbid, END_STREAM_CLOSED));
3945+
}
3946+
39273947
TEST_P(STParameterizedBucketTest, produce_delete_times) {
39283948
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
39293949
auto t1 = ep_real_time();

0 commit comments

Comments
 (0)