Skip to content

Commit 81226d8

Browse files
committed
Merge MB-31481 from branch 'alice'
See http://review.couchbase.org/100241 for commit details Merge also updates test to work in master branch Change-Id: I7101e376b4609cb5e4f7e176760e8ccdb92515f3
2 parents 5a6eed4 + 66985a8 commit 81226d8

File tree

3 files changed

+118
-0
lines changed

3 files changed

+118
-0
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,11 @@ std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
469469
if (pendingBackfill) {
470470
scheduleBackfill_UNLOCKED(true);
471471
pendingBackfill = false;
472+
// After scheduling a backfill we may now have items in readyQ -
473+
// so re-check if we didn't already have a response.
474+
if (!resp) {
475+
resp = nextQueuedItem();
476+
}
472477
} else {
473478
if (lastReadSeqno.load() >= end_seqno_) {
474479
endStream(END_STREAM_OK);

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ class MockDcpProducer : public DcpProducer {
139139
return log.getBytesOutstanding();
140140
}
141141

142+
DcpReadyQueue& getReadyQueue() {
143+
return ready;
144+
}
145+
142146
/**
143147
* Place a mock active stream into the producer
144148
*/

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3440,6 +3440,115 @@ TEST_F(SingleThreadedEPBucketTest, MB_29541) {
34403440
producer->cancelCheckpointCreatorTask();
34413441
}
34423442

3443+
/* When a backfill is activated along with a slow stream trigger,
3444+
* the stream end message gets stuck in the readyQ as the stream is
3445+
* never notified as ready to send it. As the stream transitions state
3446+
* to InMemory as well as having sent all requested sequence numbers,
3447+
* the stream is meant to end but Stream::itemsReady can cause this
3448+
* to never trigger. This means that DCP consumers can hang waiting
3449+
* for this closure message.
3450+
* This test checks that the DCP stream actually sends the end stream
3451+
* message when triggering this problematic sequence.
3452+
*/
3453+
TEST_F(SingleThreadedEPBucketTest, MB_31481) {
3454+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
3455+
3456+
// 1) First store 2 keys which we will backfill
3457+
std::array<std::string, 2> keys = {{"k1", "k2"}};
3458+
store_item(vbid, makeStoredDocKey(keys[0]), keys[0]);
3459+
store_item(vbid, makeStoredDocKey(keys[1]), keys[1]);
3460+
3461+
flush_vbucket_to_disk(vbid, keys.size());
3462+
3463+
// Simplest way to ensure DCP has to do a backfill - 'wipe memory'
3464+
resetEngineAndWarmup();
3465+
3466+
// Setup DCP, 1 producer and we will do a takeover of the vbucket
3467+
auto producer = std::make_shared<MockDcpProducer>(*engine,
3468+
cookie,
3469+
"mb-31481",
3470+
/*flags*/ 0);
3471+
3472+
MockDcpMessageProducers producers(engine.get());
3473+
3474+
ASSERT_TRUE(producer->getReadyQueue().empty());
3475+
3476+
uint64_t rollbackSeqno = 0;
3477+
auto vb = store->getVBuckets().getBucket(vbid);
3478+
ASSERT_NE(nullptr, vb.get());
3479+
EXPECT_EQ(ENGINE_SUCCESS,
3480+
producer->streamRequest(0, // flags
3481+
1, // opaque
3482+
vbid,
3483+
0, // start_seqno
3484+
vb->getHighSeqno(), // end_seqno
3485+
vb->failovers->getLatestUUID(),
3486+
0, // snap_start_seqno
3487+
vb->getHighSeqno(), // snap_end_seqno
3488+
&rollbackSeqno,
3489+
&dcpAddFailoverLog,
3490+
{}));
3491+
3492+
auto vb0Stream =
3493+
dynamic_cast<ActiveStream*>(producer->findStream(vbid).get());
3494+
ASSERT_NE(nullptr, vb0Stream);
3495+
3496+
// Manually drive the backfill (not using notifyAndStepToCheckpoint)
3497+
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
3498+
// Trigger slow stream handle
3499+
ASSERT_TRUE(vb0Stream->handleSlowStream());
3500+
// backfill:create()
3501+
runNextTask(lpAuxioQ);
3502+
// backfill:scan()
3503+
runNextTask(lpAuxioQ);
3504+
3505+
ASSERT_TRUE(producer->getReadyQueue().exists(vbid));
3506+
3507+
// Now drain all items before we proceed to complete, which triggers disk
3508+
// snapshot.
3509+
ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
3510+
ASSERT_EQ(PROTOCOL_BINARY_CMD_DCP_SNAPSHOT_MARKER, dcp_last_op);
3511+
for (const auto& key : keys) {
3512+
ASSERT_EQ(ENGINE_SUCCESS, producer->step(&producers));
3513+
ASSERT_EQ(PROTOCOL_BINARY_CMD_DCP_MUTATION, dcp_last_op);
3514+
ASSERT_EQ(key, dcp_last_key);
3515+
}
3516+
3517+
// Another producer step should report EWOULDBLOCK (no more data) as all
3518+
// items have been backfilled.
3519+
EXPECT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));
3520+
// Also the readyQ should be empty
3521+
EXPECT_TRUE(producer->getReadyQueue().empty());
3522+
3523+
// backfill:complete()
3524+
runNextTask(lpAuxioQ);
3525+
3526+
// Notified to allow stream to transition to in-memory phase.
3527+
EXPECT_TRUE(producer->getReadyQueue().exists(vbid));
3528+
3529+
// Step should cause stream closed message, previously this would
3530+
// keep the "ENGINE_EWOULDBLOCK" response due to the itemsReady flag,
3531+
// which is not expected with that message already being in the readyQ.
3532+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(&producers));
3533+
EXPECT_EQ(PROTOCOL_BINARY_CMD_DCP_STREAM_END, dcp_last_op);
3534+
3535+
// Stepping forward should now show that stream end message has been
3536+
// completed and no more messages are needed to send.
3537+
EXPECT_EQ(ENGINE_EWOULDBLOCK, producer->step(&producers));
3538+
3539+
// Similarly, the readyQ should be empty again
3540+
EXPECT_TRUE(producer->getReadyQueue().empty());
3541+
3542+
// backfill:finished() - just to cleanup.
3543+
runNextTask(lpAuxioQ);
3544+
3545+
// vb0Stream should be closed
3546+
EXPECT_FALSE(vb0Stream->isActive());
3547+
3548+
// Stop Producer checkpoint processor task
3549+
producer->cancelCheckpointCreatorTask();
3550+
}
3551+
34433552
void SingleThreadedEPBucketTest::producerReadyQLimitOnBackfill(
34443553
const BackfillBufferLimit limitType) {
34453554
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);

0 commit comments

Comments
 (0)