Skip to content

Commit 51f35ef

Browse files
committed
MB-37468: Force notification of stream completing backfill
A stream completing a backfill may race with a stepping producer. The stream may have just completed the backfill (scanned all items but not yet completed execution of the completeBackfill function) whilst the producer steps and finds nothing in the readyQueue (already processed all items). If the completing backfill then does not notify the stream as the itemsReady flag has not yet been reset (by the stepping producer) then the producer will remove the stream from its VBReadyQueue and not process it again. Nothing else would notify the stream causing us to fail to call endStream or transitionState. Change-Id: Ib430ef78261b910616576572bf76cd97c1cf47be Reviewed-on: http://review.couchbase.org/120292 Reviewed-by: James Harrison <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]> Well-Formed: Build Bot <[email protected]>
1 parent 92f1f76 commit 51f35ef

File tree

4 files changed

+142
-2
lines changed

4 files changed

+142
-2
lines changed

engines/ep/src/dcp/stream.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,10 @@ std::unique_ptr<DcpResponse> ActiveStream::next(
374374
break;
375375
}
376376

377+
if (nextHook) {
378+
nextHook();
379+
}
380+
377381
itemsReady.store(response ? true : false);
378382
return response;
379383
}
@@ -558,9 +562,21 @@ void ActiveStream::completeBackfill() {
558562
}
559563
}
560564

565+
if (completeBackfillHook) {
566+
completeBackfillHook();
567+
}
568+
561569
bool inverse = true;
562570
isBackfillTaskRunning.compare_exchange_strong(inverse, false);
563-
notifyStreamReady();
571+
572+
// MB-37468: Items may not be ready, but we need to notify the stream
573+
// regardless as a racing stepping producer that had just finished
574+
// processing all items and found an empty ready queue could clear the flag
575+
// immediately after we call notifyStreamReady (which does not notify as
576+
// itemsReady is true). This would then result in us not notifying the
577+
// stream and not putting it back in the producer's readyQueue. A similar
578+
// case exists for transitioning state to TakeoverSend or InMemory.
579+
notifyStreamReady(true);
564580
}
565581

566582
void ActiveStream::snapshotMarkerAckReceived() {

engines/ep/src/dcp/stream.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,10 @@ class ActiveStream : public Stream,
410410

411411
std::unique_ptr<DcpResponse> backfillPhase(std::lock_guard<std::mutex>& lh);
412412

413+
// MB-37468: Test only hooks set via Mock class
414+
std::function<void()> completeBackfillHook;
415+
std::function<void()> nextHook;
416+
413417
private:
414418
std::unique_ptr<DcpResponse> next(std::lock_guard<std::mutex>& lh);
415419

engines/ep/tests/mock/mock_stream.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,14 @@ class MockActiveStream : public ActiveStream {
160160
std::function<void()> preGetOutstandingItemsCallback = [] { return; };
161161

162162
bool isDead() { return ActiveStream::getState() == StreamState::Dead; };
163+
164+
void setCompleteBackfillHook(std::function<void()> hook) {
165+
completeBackfillHook = hook;
166+
}
167+
168+
void setNextHook(std::function<void()> hook) {
169+
nextHook = hook;
170+
}
163171
};
164172

165173
/**

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4268,6 +4268,118 @@ TEST_P(SingleThreadedActiveStreamTest,
42684268
EXPECT_EQ(ActiveStream::StreamState::InMemory, stream->getState());
42694269
}
42704270

4271+
// MB-37468: A stepping producer that has found no items (backfill fully
4272+
// processed can race with a completing backfill in such a way that we fail to
4273+
// notify the producer that the stream needs further processing. This causes us
4274+
// to fail to send a StreamEnd message. A similar case exists for transitioning
4275+
// state to TakeoverSend or InMemory.
4276+
TEST_P(SingleThreadedActiveStreamTest, CompleteBackfillRaceNoStreamEnd) {
4277+
auto vb = engine->getVBucket(vbid);
4278+
auto& ckptMgr = *vb->checkpointManager;
4279+
4280+
// Delete initial stream (so we can re-create after items are available
4281+
// from backing store).
4282+
stream.reset();
4283+
4284+
// Add items, flush it to disk, then clear checkpoint to force backfill.
4285+
store_item(vbid, makeStoredDocKey("key1"), "value");
4286+
ckptMgr.createNewCheckpoint();
4287+
4288+
flushVBucketToDiskIfPersistent(vbid, 1);
4289+
bool newCKptCreated;
4290+
ASSERT_EQ(1, ckptMgr.removeClosedUnrefCheckpoints(*vb, newCKptCreated));
4291+
4292+
// Re-create producer now we have items only on disk. We want to stream up
4293+
// to seqno 1 (our only item) to test that we get the StreamEnd message.
4294+
stream = producer->mockActiveStreamRequest(0 /*flags*/,
4295+
0 /*opaque*/,
4296+
*vb,
4297+
0 /*st_seqno*/,
4298+
1 /*en_seqno*/,
4299+
0x0 /*vb_uuid*/,
4300+
0 /*snap_start_seqno*/,
4301+
~0 /*snap_end_seqno*/);
4302+
ASSERT_TRUE(stream->isBackfilling());
4303+
4304+
auto producers = get_dcp_producers(
4305+
reinterpret_cast<ENGINE_HANDLE*>(engine.get()),
4306+
reinterpret_cast<ENGINE_HANDLE_V1*>(engine.get()));
4307+
4308+
// Step to schedule our backfill
4309+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
4310+
EXPECT_EQ(0, stream->public_readyQ().size());
4311+
4312+
auto& bfm = producer->getBFM();
4313+
4314+
// Ephemeral has a single stage backfill and we only compare about the
4315+
// complete stage so skip over create and scan for persistent buckets
4316+
if (persistent()) {
4317+
bfm.backfill();
4318+
bfm.backfill();
4319+
}
4320+
4321+
ThreadGate tg1 (2);
4322+
ThreadGate tg2 (2);
4323+
std::thread t1;
4324+
stream->setCompleteBackfillHook([this, &t1, &tg1, &tg2, &producers]() {
4325+
// Step past our normal items to expose the race with backfill complete
4326+
// and an empty readyQueue.
4327+
4328+
EXPECT_EQ(1, *stream->getNumBackfillItemsRemaining());
4329+
EXPECT_EQ(2, stream->public_readyQ().size());
4330+
4331+
// Step snapshot marker
4332+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
4333+
EXPECT_EQ((uint8_t)cb::mcbp::ClientOpcode::DcpSnapshotMarker,
4334+
dcp_last_op);
4335+
4336+
// Step mutation
4337+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
4338+
EXPECT_EQ((uint8_t)cb::mcbp::ClientOpcode::DcpMutation, dcp_last_op);
4339+
4340+
stream->setNextHook([&tg1, &tg2]() {
4341+
if (!tg1.isComplete()) {
4342+
tg1.threadUp();
4343+
4344+
// Wait for the completeBackfill thread to have attempted to
4345+
// notify that the stream is ready before exiting the hook and
4346+
// setting itemsReady.
4347+
tg2.threadUp();
4348+
}
4349+
});
4350+
4351+
4352+
// Run the step in a different thread
4353+
t1 = std::thread{[this, &producers]() {
4354+
// This step should produce the stream end
4355+
EXPECT_EQ(ENGINE_WANT_MORE, producer->step(producers.get()));
4356+
EXPECT_EQ((uint8_t)cb::mcbp::ClientOpcode::DcpStreamEnd, dcp_last_op);
4357+
}};
4358+
4359+
// Wait for the stepping thread to have reached the point at which it is
4360+
// about to set itemsReady before we attempt to set itemsReady after we
4361+
// exit this hook.
4362+
tg1.threadUp();
4363+
});
4364+
4365+
// Complete the backfill to expose the race condition
4366+
bfm.backfill();
4367+
4368+
// Unblock the stepping thread to now find the stream end
4369+
tg2.threadUp();
4370+
4371+
t1.join();
4372+
4373+
// Should have sent StreamEnd but stream still in queue
4374+
EXPECT_FALSE(producer->findStream(vbid)->isActive());
4375+
EXPECT_FALSE(producer->getReadyQueue().empty());
4376+
4377+
// Step to remove stream from queue
4378+
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
4379+
EXPECT_FALSE(producer->findStream(vbid)->isActive());
4380+
EXPECT_TRUE(producer->getReadyQueue().empty());
4381+
}
4382+
42714383
INSTANTIATE_TEST_CASE_P(AllBucketTypes,
42724384
SingleThreadedActiveStreamTest,
4273-
allConfigValues, );
4385+
allConfigValues, );

0 commit comments

Comments
 (0)