Skip to content

Commit 840c194

Browse files
committed
MB-34850: Consumer: Accept (but ignore) messages after Close Stream
As identified during Sync Replication testing, it is possible for the ns_server DCP proxy to forward DCP messages from the Producer to the Consumer even after the DCP proxy has sent Close Stream to the Consumer. Currently these messages are treated as erroneous by the DCP Consumer, causing it to send KEY_ENOENT responses back to the DCP Producer (which hasn't started the Stream close process), and hence the DCP Producer treats this as an error and tears down the connection, ultimately resulting in rebalance failing. Ideally these 'extra' messages would never be forwarded by ns_server, however (a) the DCP Documentation[1] _does_ permit this behaviour, and (b) it's non-trivial for ns_server to fix this issue in the DCP proxy as two different processes handle the producer and consumer proxying, and do not share state. Instead this is addressed in ep-engine - any stream-level messages received between the Close Stream request and the later Stream End request should be silently ignored. [1]: https://github.com/couchbase/kv_engine/blob/master/docs/dcp/documentation/commands/close-stream.md Change-Id: I43700d534bb8072dec707ef143e89c308030ff64 Reviewed-on: http://review.couchbase.org/111745 Tested-by: Build Bot <[email protected]> Reviewed-by: James Harrison <[email protected]> Reviewed-by: Ben Huddleston <[email protected]>
1 parent fa4e901 commit 840c194

File tree

4 files changed

+175
-69
lines changed

4 files changed

+175
-69
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 66 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,9 @@ ENGINE_ERROR_CODE DcpConsumer::closeStream(uint32_t opaque,
350350

351351
uint32_t bytesCleared = stream->setDead(END_STREAM_CLOSED);
352352
flowControl.incrFreedBytes(bytesCleared);
353-
removeStream(vbucket);
353+
// Note the stream is not yet removed from the `streams` map; as we need to
354+
// handle (but ignore) any in-flight messages from the Producer until
355+
// STREAM_END is received.
354356
scheduleNotifyIfNecessary();
355357

356358
return ENGINE_SUCCESS;
@@ -374,12 +376,6 @@ ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque,
374376
return ENGINE_KEY_ENOENT;
375377
}
376378

377-
if (!stream->isActive()) {
378-
logger->warn("({}) End stream received but stream is not active",
379-
vbucket);
380-
return ENGINE_KEY_ENOENT;
381-
}
382-
383379
if (stream->getOpaque() != opaque) {
384380
logger->warn("({}) End stream received with opaque {} but expected {}",
385381
vbucket,
@@ -395,7 +391,16 @@ ENGINE_ERROR_CODE DcpConsumer::streamEnd(uint32_t opaque,
395391
static_cast<end_stream_status_t>(flags),
396392
vbucket,
397393
cb::mcbp::DcpStreamId{});
398-
return lookupStreamAndDispatchMessage(ufc, vbucket, opaque, std::move(msg));
394+
auto res = lookupStreamAndDispatchMessage(
395+
ufc, vbucket, opaque, std::move(msg));
396+
397+
if (res == ENGINE_SUCCESS) {
398+
// Stream End message successfully passed to stream. Can now remove
399+
// the stream from the streams map as it has completed its lifetime.
400+
removeStream(vbucket);
401+
}
402+
403+
return res;
399404
}
400405

401406
ENGINE_ERROR_CODE DcpConsumer::processMutationOrPrepare(
@@ -540,64 +545,66 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
540545
return ENGINE_EINVAL;
541546
}
542547

543-
ENGINE_ERROR_CODE err = ENGINE_KEY_ENOENT;
544548
auto stream = findStream(vbucket);
545-
if (stream && stream->getOpaque() == opaque && stream->isActive()) {
546-
queued_item item(Item::makeDeletedItem(deletionCause,
547-
key,
548-
0,
549-
deleteTime,
550-
value.data(),
551-
value.size(),
552-
datatype,
553-
cas,
554-
bySeqno,
555-
vbucket,
556-
revSeqno));
557-
558-
// MB-29040: Producer may send deleted doc with value that still has
559-
// the user xattrs and the body. Fix up that mistake by running the
560-
// expiry hook which will correctly process the document
561-
if (value.size()) {
562-
if (mcbp::datatype::is_xattr(datatype)) {
563-
auto vb = engine_.getVBucket(vbucket);
564-
if (vb) {
565-
engine_.getKVBucket()->runPreExpiryHook(*vb, *item);
566-
}
567-
} else {
568-
// MB-31141: Deletes cannot have a value
569-
item->replaceValue(Blob::New(0));
570-
item->setDataType(PROTOCOL_BINARY_RAW_BYTES);
571-
}
572-
}
549+
if (!stream || (stream->getOpaque() != opaque)) {
550+
// No stream for this vBucket / opaque - return ENOENT to indicate this.
551+
return ENGINE_KEY_ENOENT;
552+
}
573553

574-
std::unique_ptr<ExtendedMetaData> emd;
575-
if (meta.size() > 0) {
576-
emd = std::make_unique<ExtendedMetaData>(meta.data(), meta.size());
577-
if (emd->getStatus() == ENGINE_EINVAL) {
578-
err = ENGINE_EINVAL;
554+
queued_item item(Item::makeDeletedItem(deletionCause,
555+
key,
556+
0,
557+
deleteTime,
558+
value.data(),
559+
value.size(),
560+
datatype,
561+
cas,
562+
bySeqno,
563+
vbucket,
564+
revSeqno));
565+
566+
// MB-29040: Producer may send deleted doc with value that still has
567+
// the user xattrs and the body. Fix up that mistake by running the
568+
// expiry hook which will correctly process the document
569+
if (value.size()) {
570+
if (mcbp::datatype::is_xattr(datatype)) {
571+
auto vb = engine_.getVBucket(vbucket);
572+
if (vb) {
573+
engine_.getKVBucket()->runPreExpiryHook(*vb, *item);
579574
}
575+
} else {
576+
// MB-31141: Deletes cannot have a value
577+
item->replaceValue(Blob::New(0));
578+
item->setDataType(PROTOCOL_BINARY_RAW_BYTES);
580579
}
580+
}
581581

582-
try {
583-
err = stream->messageReceived(
584-
std::make_unique<MutationConsumerMessage>(
585-
item,
586-
opaque,
587-
IncludeValue::Yes,
588-
IncludeXattrs::Yes,
589-
includeDeleteTime,
590-
key.getEncoding(),
591-
emd.release(),
592-
cb::mcbp::DcpStreamId{}));
593-
} catch (const std::bad_alloc&) {
594-
err = ENGINE_ENOMEM;
582+
ENGINE_ERROR_CODE err;
583+
std::unique_ptr<ExtendedMetaData> emd;
584+
if (meta.size() > 0) {
585+
emd = std::make_unique<ExtendedMetaData>(meta.data(), meta.size());
586+
if (emd->getStatus() == ENGINE_EINVAL) {
587+
err = ENGINE_EINVAL;
595588
}
589+
}
596590

597-
// The item was buffered and will be processed later
598-
if (err == ENGINE_TMPFAIL) {
599-
notifyVbucketReady(vbucket);
600-
}
591+
try {
592+
err = stream->messageReceived(std::make_unique<MutationConsumerMessage>(
593+
item,
594+
opaque,
595+
IncludeValue::Yes,
596+
IncludeXattrs::Yes,
597+
includeDeleteTime,
598+
key.getEncoding(),
599+
emd.release(),
600+
cb::mcbp::DcpStreamId{}));
601+
} catch (const std::bad_alloc&) {
602+
err = ENGINE_ENOMEM;
603+
}
604+
605+
// The item was buffered and will be processed later
606+
if (err == ENGINE_TMPFAIL) {
607+
notifyVbucketReady(vbucket);
601608
}
602609

603610
return err;
@@ -1648,12 +1655,6 @@ ENGINE_ERROR_CODE DcpConsumer::lookupStreamAndDispatchMessage(
16481655
return ENGINE_KEY_ENOENT;
16491656
}
16501657

1651-
if (!stream->isActive()) {
1652-
// Stream is not active - also uses KEY_ENOENT to indicate no valid
1653-
// stream.
1654-
return ENGINE_KEY_ENOENT;
1655-
}
1656-
16571658
// Pass the message to the associated stream.
16581659
ENGINE_ERROR_CODE err;
16591660
try {

engines/ep/src/dcp/passive_stream.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,12 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(
225225
}
226226

227227
if (!isActive()) {
228-
return ENGINE_KEY_ENOENT;
228+
// If the Stream isn't active, *but* the object is still receiving
229+
// messages from the DcpConsumer that means the stream is still
230+
// registered in the streams map and hence we should ignore any
231+
// messages (until STREAM_END is received and the stream is removed form
232+
// the map).
233+
return ENGINE_SUCCESS;
229234
}
230235

231236
auto seqno = dcpResponse->getBySeqno();

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4882,9 +4882,6 @@ static enum test_result test_dcp_consumer_end_stream(EngineIface* h) {
48824882
dcp->stream_end(cookie, stream_opaque, vbucket, end_flag),
48834883
"Expected success");
48844884

4885-
wait_for_stat_to_be(
4886-
h, "eq_dcpq:unittest:stream_0_state", std::string{"dead"}, "dcp");
4887-
48884885
testHarness->destroy_cookie(cookie);
48894886
return SUCCESS;
48904887
}

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3821,6 +3821,109 @@ TEST_F(SingleThreadedEPBucketTest, testValidTombstonePurgeOnRetainErroneousTombs
38213821
EXPECT_EQ(2, store->getVBucket(vbid)->getPurgeSeqno());
38223822
}
38233823

3824+
// MB-34850: Check that a consumer correctly handles (and ignores) stream-level
3825+
// messages (Mutation/Deletion/Prepare/Commit/Abort/...) received after
3826+
// CloseStream response but *before* the Producer sends STREAM_END.
3827+
TEST_F(SingleThreadedEPBucketTest,
3828+
MB_34850_ConsumerRecvMessagesAfterCloseStream) {
3829+
// Setup: Create replica VB and create stream for vbid.
3830+
// Have the consumer receive a snapshot marker(1..10), and then close the
3831+
// stream .
3832+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
3833+
auto consumer = std::make_shared<MockDcpConsumer>(*engine, cookie, "conn");
3834+
int opaque = 1;
3835+
ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque, vbid, /*flags*/ 0));
3836+
ASSERT_EQ(ENGINE_SUCCESS,
3837+
consumer->snapshotMarker(opaque, vbid, 1, 10, MARKER_FLAG_CHK));
3838+
ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
3839+
3840+
// Test: Have the producer send further messages on the stream (before the
3841+
// final STREAM_END. These should all be accepted (but discarded) by the
3842+
// replica.
3843+
auto testAllStreamLevelMessages = [&consumer, this, opaque](
3844+
ENGINE_ERROR_CODE expected) {
3845+
auto key = makeStoredDocKey("key");
3846+
auto dtype = PROTOCOL_BINARY_RAW_BYTES;
3847+
EXPECT_EQ(expected,
3848+
consumer->mutation(opaque,
3849+
key,
3850+
{},
3851+
0,
3852+
dtype,
3853+
{},
3854+
vbid,
3855+
{},
3856+
1,
3857+
{},
3858+
{},
3859+
{},
3860+
{},
3861+
{}));
3862+
3863+
EXPECT_EQ(expected,
3864+
consumer->deletion(
3865+
opaque, key, {}, 0, dtype, {}, vbid, 2, {}, {}));
3866+
3867+
EXPECT_EQ(expected,
3868+
consumer->deletionV2(
3869+
opaque, key, {}, 0, dtype, {}, vbid, 3, {}, {}));
3870+
3871+
EXPECT_EQ(expected,
3872+
consumer->expiration(
3873+
opaque, key, {}, 0, dtype, {}, vbid, 4, {}, {}));
3874+
3875+
EXPECT_EQ(
3876+
expected,
3877+
consumer->setVBucketState(opaque, vbid, vbucket_state_active));
3878+
auto vb = engine->getKVBucket()->getVBucket(vbid);
3879+
EXPECT_EQ(vbucket_state_replica, vb->getState());
3880+
3881+
EXPECT_EQ(expected,
3882+
consumer->systemEvent(opaque,
3883+
vbid,
3884+
mcbp::systemevent::id::CreateCollection,
3885+
5,
3886+
mcbp::systemevent::version::version1,
3887+
{},
3888+
{}));
3889+
3890+
EXPECT_EQ(expected,
3891+
consumer->prepare(opaque,
3892+
key,
3893+
{},
3894+
0,
3895+
dtype,
3896+
{},
3897+
vbid,
3898+
{},
3899+
6,
3900+
{},
3901+
{},
3902+
{},
3903+
{},
3904+
{},
3905+
{}));
3906+
3907+
EXPECT_EQ(expected, consumer->commit(opaque, vbid, key, 6, 7));
3908+
3909+
EXPECT_EQ(expected, consumer->abort(opaque, vbid, key, 6, 7));
3910+
3911+
EXPECT_EQ(expected,
3912+
consumer->snapshotMarker(
3913+
opaque, vbid, 11, 11, MARKER_FLAG_CHK));
3914+
};
3915+
testAllStreamLevelMessages(ENGINE_SUCCESS);
3916+
3917+
// Setup (phase 2): Receive a STREAM_END message - after which all of the
3918+
// above stream-level messages should be rejected as ENOENT.
3919+
ASSERT_EQ(ENGINE_SUCCESS,
3920+
consumer->streamEnd(opaque, vbid, END_STREAM_CLOSED));
3921+
3922+
// Test (phase 2): Have the producer send all the above stream-level
3923+
// messages to the consumer. Should all be rejected this time.
3924+
testAllStreamLevelMessages(ENGINE_KEY_ENOENT);
3925+
}
3926+
38243927
TEST_P(STParameterizedBucketTest, produce_delete_times) {
38253928
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
38263929
auto t1 = ep_real_time();

0 commit comments

Comments
 (0)