Skip to content

Commit 66985a8

Browse files
CSubmergeddave-finlay
authored andcommitted
MB-31481: Don't skip STREAM_END msg after cursor drop during backfill
For DCP producers needing backfill, There's an edge-case in handling cursor dropping: - When a backfill is re-scheduled due to a slow stream trigger (cursor dropping), if the second backfill isn't necessary (as the required items are still in the checkpoing manager) then the second backfill is skipped and the stream transition to in-memory. - If the in-memory stream then doesn't need to fetch anymore data (as no more mutations have occurred), then the stream advances directly to endStream - see ActiveStream::transitionState(): case StreamState::InMemory: // Check if the producer has sent up till the last requested // sequence number already, if not - move checkpoint items into // the ready queue. if (lastSentSeqno.load() >= end_seqno_) { // Stream transitioning to DEAD state endStream(END_STREAM_OK); /// <--- HERE notifyStreamReady(); } However, there is a bug where this END_STREAM message is never transmitted to the DCP client, resulting in the stream hanging. The problem is that stream end message gets stuck in the readyQ, as the stream is never notified as ready to send it. This is due to the expectation of Stream::next() dealing with queued items being broken - if next() retuns null then the itemsReady flag is cleared as it assumes a null retudn means no more items are available. However in this situation there /are/ items ready, but not until transitionState() is called. Following from this, the stream is never notified so doesnt send a closure message, which means that DCP consumers can hang waiting for this closure message. This patch triggers nextQueuedItem inside the pendingBackfill case to ensure that the end stream message is notified and executed. The patch also includes a test to follow this problematic sequence. Change-Id: I28885cca1e853b72cc886f756000190b903d1d53 Reviewed-on: http://review.couchbase.org/100241 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Well-Formed: Build Bot <[email protected]>
1 parent 9444e64 commit 66985a8

File tree

3 files changed

+116
-0
lines changed

3 files changed

+116
-0
lines changed

engines/ep/src/dcp/stream.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,11 @@ std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
676676
if (pendingBackfill) {
677677
scheduleBackfill_UNLOCKED(true);
678678
pendingBackfill = false;
679+
// After scheduling a backfill we may now have items in readyQ -
680+
// so re-check if we didn't already have a response.
681+
if (!resp) {
682+
resp = nextQueuedItem();
683+
}
679684
} else {
680685
if (lastReadSeqno.load() >= end_seqno_) {
681686
endStream(END_STREAM_OK);

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ class MockDcpProducer : public DcpProducer {
146146
return log.getBytesOutstanding();
147147
}
148148

149+
DcpReadyQueue& getReadyQueue() {
150+
return ready;
151+
}
152+
149153
/**
150154
* Place a mock active stream into the producer
151155
*/

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3479,6 +3479,113 @@ TEST_F(SingleThreadedEPBucketTest, MB_29541) {
34793479
producer->cancelCheckpointCreatorTask();
34803480
}
34813481

3482+
/* When a backfill is activated along with a slow stream trigger,
3483+
* the stream end message gets stuck in the readyQ as the stream is
3484+
* never notified as ready to send it. As the stream transitions state
3485+
* to InMemory as well as having sent all requested sequence numbers,
3486+
* the stream is meant to end but Stream::itemsReady can cause this
3487+
* to never trigger. This means that DCP consumers can hang waiting
3488+
* for this closure message.
3489+
* This test checks that the DCP stream actually sends the end stream
3490+
* message when triggering this problematic sequence.
3491+
*/
3492+
TEST_F(SingleThreadedEPBucketTest, MB_31481) {
3493+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3494+
3495+
// 1) First store 2 keys which we will backfill
3496+
std::array<std::string, 2> keys = {{"k1", "k2"}};
3497+
store_item(vbid, makeStoredDocKey(keys[0]), keys[0]);
3498+
store_item(vbid, makeStoredDocKey(keys[1]), keys[1]);
3499+
3500+
flush_vbucket_to_disk(vbid, keys.size());
3501+
3502+
// Simplest way to ensure DCP has to do a backfill - 'wipe memory'
3503+
resetEngineAndWarmup();
3504+
3505+
// Setup DCP, 1 producer and we will do a takeover of the vbucket
3506+
auto producer = createDcpProducer(cookie, {}, false, IncludeDeleteTime::No);
3507+
3508+
auto producers = get_dcp_producers(
3509+
reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
3510+
reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
3511+
3512+
ASSERT_TRUE(producer->getReadyQueue().empty());
3513+
3514+
uint64_t rollbackSeqno = 0;
3515+
auto vb = store->getVBuckets().getBucket(vbid);
3516+
ASSERT_NE(nullptr, vb.get());
3517+
EXPECT_EQ(ENGINE_SUCCESS,
3518+
producer->streamRequest(0, // flags
3519+
1, // opaque
3520+
vbid,
3521+
0, // start_seqno
3522+
vb->getHighSeqno(), // end_seqno
3523+
vb->failovers->getLatestUUID(),
3524+
0, // snap_start_seqno
3525+
vb->getHighSeqno(), // snap_end_seqno
3526+
&rollbackSeqno,
3527+
&dcpAddFailoverLog));
3528+
3529+
auto vb0Stream =
3530+
dynamic_cast<ActiveStream*>(producer->findStream(vbid).get());
3531+
ASSERT_NE(nullptr, vb0Stream);
3532+
3533+
// Manually drive the backfill (not using notifyAndStepToCheckpoint)
3534+
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
3535+
// Trigger slow stream handle
3536+
ASSERT_TRUE(vb0Stream->handleSlowStream());
3537+
// backfill:create()
3538+
runNextTask(lpAuxioQ);
3539+
// backfill:scan()
3540+
runNextTask(lpAuxioQ);
3541+
3542+
ASSERT_TRUE(producer->getReadyQueue().exists(vbid));
3543+
3544+
// Now drain all items before we proceed to complete, which triggers disk
3545+
// snapshot.
3546+
ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3547+
ASSERT_EQ(PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER, dcp_last_op);
3548+
for (const auto& key : keys) {
3549+
ASSERT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3550+
ASSERT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3551+
ASSERT_EQ(key, dcp_last_key);
3552+
}
3553+
3554+
// Another producer step should report SUCCESS (no more data) as all items
3555+
// have been backfilled.
3556+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3557+
// Also the readyQ should be empty
3558+
EXPECT_TRUE(producer->getReadyQueue().empty());
3559+
3560+
// backfill:complete()
3561+
runNextTask(lpAuxioQ);
3562+
3563+
// Notified to allow stream to transition to in-memory phase.
3564+
EXPECT_TRUE(producer->getReadyQueue().exists(vbid));
3565+
3566+
// Step should cause stream closed message, previously this would
3567+
// keep the "ENGINE_SUCCESS" response due to the itemsReady flag,
3568+
// which is not expected with that message already being in the readyQ.
3569+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
3570+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_STREAM_END, dcp_last_op);
3571+
3572+
// Stepping forward should now show that stream end message has been
3573+
// completed and no more messages are needed to send.
3574+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
3575+
3576+
// Similarly, the readyQ should be empty again
3577+
EXPECT_TRUE(producer->getReadyQueue().empty());
3578+
3579+
// backfill:finished() - just to cleanup.
3580+
runNextTask(lpAuxioQ);
3581+
3582+
// vb0Stream should be closed
3583+
EXPECT_FALSE(vb0Stream->isActive());
3584+
3585+
// Stop Producer checkpoint processor task
3586+
producer->cancelCheckpointCreatorTask();
3587+
}
3588+
34823589
INSTANTIATE_TEST_CASE_P(XattrSystemUserTest,
34833590
XattrSystemUserTest,
34843591
::testing::Bool(), );

0 commit comments

Comments
 (0)