Skip to content

Commit 0286542

Browse files
owend74daverigby
authored andcommitted
[BP] MB-25798: Backfill task leave stream state unchanged
The backfill task must not change the state machine of the active stream that it is associated with. In particular: 1) In ActiveStream::markDiskSnapshot if a vbucket is not found then it should just return, leaving the entity deleting the vbucket to set the stream to dead. 2) ActiveStream::completeBackfill should not call scheduleBackfill_UNLOCKED as this can move the stream into the STREAM_IN_MEMORY state. State machine changes should only be driven by ActiveStream::next, which is invoked by DcpProducer::getNextItem. The call to scheduleBackfill_UNLOCKED that was invoked in ActiveStream::completeBackfill when the pendingBackfill flag was true has been moved to the ActiveStream::backfillPhase function and is invoked once the current backfill has completed and the pendingBackfill flag is set to true. Change-Id: Idcbf8164792f4fd09898fe90748424687c60fb6a Reviewed-on: http://review.couchbase.org/83524 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent a2d57ae commit 0286542

File tree

3 files changed

+47
-43
lines changed

3 files changed

+47
-43
lines changed

src/dcp/stream.cc

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ DcpResponse* ActiveStream::next() {
238238
break;
239239
case STREAM_BACKFILLING:
240240
validTransition = true;
241-
response = backfillPhase();
241+
response = backfillPhase(lh);
242242
break;
243243
case STREAM_IN_MEMORY:
244244
validTransition = true;
@@ -283,16 +283,26 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
283283
uint64_t chkCursorSeqno = endSeqno;
284284

285285
if (state_ != STREAM_BACKFILLING) {
286+
producer->getLogger().log(EXTENSION_LOG_WARNING,
287+
"(vb %" PRIu16 ") ActiveStream::"
288+
"markDiskSnapshot: Unexpected state_:%s",
289+
vb_, stateName(state_));
286290
return;
287291
}
288292

289293
startSeqno = std::min(snap_start_seqno_, startSeqno);
290294
firstMarkerSent = true;
291295

292296
RCPtr<VBucket> vb = engine->getVBucket(vb_);
297+
if (!vb) {
298+
producer->getLogger().log(EXTENSION_LOG_WARNING,"(vb %" PRIu16 ") "
299+
"ActiveStream::markDiskSnapshot, vbucket "
300+
"does not exist", vb_);
301+
return;
302+
}
293303
// An atomic read of vbucket state without acquiring the
294304
// reader lock for state should suffice here.
295-
if (vb && vb->getState() == vbucket_state_replica) {
305+
if (vb->getState() == vbucket_state_replica) {
296306
if (end_seqno_ > endSeqno) {
297307
/* We possibly have items in the open checkpoint
298308
(incomplete snapshot) */
@@ -314,9 +324,7 @@ void ActiveStream::markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno) {
314324
MARKER_FLAG_DISK));
315325
lastSentSnapEndSeqno.store(endSeqno, std::memory_order_relaxed);
316326

317-
if (!vb) {
318-
endStream(END_STREAM_STATE);
319-
} else if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
327+
if (!(flags_ & DCP_ADD_STREAM_FLAG_DISKONLY)) {
320328
// Only re-register the cursor if we still need to get memory snapshots
321329
CursorRegResult result =
322330
vb->checkpointManager.registerCursorBySeqno(
@@ -384,26 +392,10 @@ void ActiveStream::completeBackfill() {
384392
uint64_t(backfillItems.memory.load()),
385393
lastReadSeqno.load(),
386394
pendingBackfill ? "True" : "False");
387-
388-
isBackfillTaskRunning = false;
389-
if (pendingBackfill) {
390-
scheduleBackfill_UNLOCKED(true);
391-
pendingBackfill = false;
392-
}
393-
394-
bool expected = false;
395-
if (itemsReady.compare_exchange_strong(expected, true)) {
396-
producer->notifyStreamReady(vb_);
397-
}
398-
399-
/**
400-
* MB-22451: It is important that we return here because
401-
* scheduleBackfill_UNLOCKED(true) can set
402-
* isBackfillTaskRunning to true. Therefore if we don't return we
403-
* will set isBackfillTaskRunning prematurely back to false, (see
404-
* below).
405-
*/
406-
return;
395+
} else {
396+
producer->getLogger().log(EXTENSION_LOG_WARNING,
397+
"(vb %" PRIu16 ") ActiveStream::completeBackfill: "
398+
"Unexpected state_:%s", vb_, stateName(state_));
407399
}
408400
}
409401

@@ -488,7 +480,7 @@ void ActiveStream::setVBucketStateAckRecieved() {
488480
}
489481
}
490482

491-
DcpResponse* ActiveStream::backfillPhase() {
483+
DcpResponse* ActiveStream::backfillPhase(LockHolder& lh) {
492484
DcpResponse* resp = nextQueuedItem();
493485

494486
if (resp && (resp->getEvent() == DCP_MUTATION ||
@@ -504,19 +496,27 @@ DcpResponse* ActiveStream::backfillPhase() {
504496
}
505497

506498
if (!isBackfillTaskRunning && readyQ.empty()) {
499+
// Given readyQ.empty() is True resp will be NULL
507500
backfillRemaining.store(0, std::memory_order_relaxed);
508-
if (lastReadSeqno.load() >= end_seqno_) {
509-
endStream(END_STREAM_OK);
510-
} else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
511-
transitionState(STREAM_TAKEOVER_SEND);
512-
} else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
513-
endStream(END_STREAM_OK);
501+
// The previous backfill has completed. Check to see if another
502+
// backfill needs to be scheduled.
503+
if (pendingBackfill) {
504+
scheduleBackfill_UNLOCKED(true);
505+
pendingBackfill = false;
514506
} else {
515-
transitionState(STREAM_IN_MEMORY);
516-
}
507+
if (lastReadSeqno.load() >= end_seqno_) {
508+
endStream(END_STREAM_OK);
509+
} else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
510+
transitionState(STREAM_TAKEOVER_SEND);
511+
} else if (flags_ & DCP_ADD_STREAM_FLAG_DISKONLY) {
512+
endStream(END_STREAM_OK);
513+
} else {
514+
transitionState(STREAM_IN_MEMORY);
515+
}
517516

518-
if (!resp) {
519-
resp = nextQueuedItem();
517+
if (!resp) {
518+
resp = nextQueuedItem();
519+
}
520520
}
521521
}
522522

@@ -528,6 +528,8 @@ DcpResponse* ActiveStream::inMemoryPhase() {
528528
endStream(END_STREAM_OK);
529529
} else if (readyQ.empty()) {
530530
if (pendingBackfill) {
531+
// Moving the state from STREAM_IN_MEMORY to
532+
// STREAM_BACKFILLING will result in a backfill being scheduled
531533
transitionState(STREAM_BACKFILLING);
532534
pendingBackfill = false;
533535
return NULL;

src/dcp/stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ class ActiveStream : public Stream {
281281

282282
private:
283283

284-
DcpResponse* backfillPhase();
284+
DcpResponse* backfillPhase(LockHolder& lh);
285285

286286
DcpResponse* inMemoryPhase();
287287

tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ class SingleThreadedEPStoreTest : public EventuallyPersistentStoreTest {
123123
/**
124124
* Regression test for MB-22451: When handleSlowStream is called and in
125125
* STREAM_BACKFILLING state and currently have a backfill scheduled (or running)
126-
* ensure that when the backfill completes the new backfill is scheduled and
127-
* the backfilling flag remains true.
126+
* ensure that when the backfill completes pendingBackfill remains true,
127+
* isBackfillTaskRunning is false and, the stream state remains set to
128+
* STREAM_BACKFILLING.
128129
*/
129130
TEST_F(SingleThreadedEPStoreTest, test_mb22451) {
130131
// Make vbucket active.
@@ -168,11 +169,12 @@ TEST_F(SingleThreadedEPStoreTest, test_mb22451) {
168169
// The call to handleSlowStream should result in setting pendingBackfill
169170
// flag to true
170171
EXPECT_TRUE(mock_stream->public_getPendingBackfill())
171-
<< "pendingBackfill is not true";
172+
<< "handleSlowStream should set pendingBackfill to True";
172173
mock_stream->completeBackfill();
173-
EXPECT_TRUE(mock_stream->public_isBackfillTaskRunning())
174-
<< "isBackfillRunning is not true";
175-
174+
EXPECT_FALSE(mock_stream->public_isBackfillTaskRunning())
175+
<< "completeBackfill should set isBackfillTaskRunning to False";
176+
EXPECT_EQ(STREAM_BACKFILLING, mock_stream->getState())
177+
<< "stream state should not have changed";
176178
// Required to ensure that the backfillMgr is deleted
177179
producer->closeAllStreams();
178180
}

0 commit comments

Comments
 (0)