Skip to content

Commit b1bbe30

Browse files
owend74daverigby
authored andcommitted
[BP] MB-36802: Don't return estimate=0 for dcp-takeover stats before backfill
[Backport of MB-35594.] The 'dcp-takeover' stats are used by ns_server to estimate how many mutations are remaining on a DCP stream. However, the estimate value is not updated until the backfill task has run once (and scanned the disk file). As such, if 'dcp-takeover' stats are requested before that first backfil task has run, then they can incorrectly report '0' backfill items. To address this, change backfillRemaining to be of type boost::optional, initialized to an empty optional. Only when the backfill scan has completed (when the number of items remaining is determined) is the optional populated. Then, when stats are requested use a new status value "calculating-item-count" if the optional is empty (i.e. before scan). Change-Id: Ia11dfe830ebd690bb40884594992acbcd21c104d Reviewed-on: http://review.couchbase.org/118410 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 12fd1db commit b1bbe30

File tree

9 files changed

+242
-46
lines changed

9 files changed

+242
-46
lines changed

engines/ep/src/dcp/backfill_disk.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ backfill_status_t DCPBackfillDisk::create() {
256256
stream->setDead(status);
257257
transitionState(backfill_state_done);
258258
} else {
259-
stream->incrBackfillRemaining(scanCtx->documentCount);
259+
stream->setBackfillRemaining(scanCtx->documentCount);
260260
stream->markDiskSnapshot(startSeqno, scanCtx->maxSeqno);
261261
transitionState(backfill_state_scanning);
262262
}

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ backfill_status_t DCPBackfillMemory::run() {
9090
}
9191

9292
/* Put items onto readyQ of the DCP stream */
93-
stream->incrBackfillRemaining(items.size());
93+
stream->setBackfillRemaining(items.size());
9494

9595
/* Mark disk snapshot */
9696
stream->markDiskSnapshot(startSeqno, adjustedEndSeqno);
@@ -225,11 +225,11 @@ backfill_status_t DCPBackfillMemoryBuffered::create() {
225225
remaining count */
226226
while (rangeItr.curr() != rangeItr.end()) {
227227
if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
228-
/* Incr backfill remaining
228+
/* Set backfill remaining
229229
[EPHE TODO]: This will be inaccurate if do not backfill till end
230230
of the iterator
231231
*/
232-
stream->incrBackfillRemaining(rangeItr.count());
232+
stream->setBackfillRemaining(rangeItr.count());
233233

234234
/* Determine the endSeqno of the current snapshot.
235235
We want to send till requested endSeqno, but if that cannot

engines/ep/src/dcp/stream.cc

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ ActiveStream::ActiveStream(EventuallyPersistentEngine* e,
256256
isBackfillTaskRunning(false),
257257
pendingBackfill(false),
258258
lastReadSeqno(st_seqno),
259-
backfillRemaining(0),
259+
backfillRemaining(),
260260
lastReadSeqnoUnSnapshotted(st_seqno),
261261
lastSentSeqno(st_seqno),
262262
curChkSeqno(st_seqno),
@@ -635,6 +635,16 @@ void ActiveStream::setVBucketStateAckRecieved() {
635635
notifyStreamReady();
636636
}
637637

638+
void ActiveStream::setBackfillRemaining(size_t value) {
639+
std::lock_guard<std::mutex> guard(streamMutex);
640+
backfillRemaining = value;
641+
}
642+
643+
void ActiveStream::clearBackfillRemaining() {
644+
std::lock_guard<std::mutex> guard(streamMutex);
645+
backfillRemaining.reset();
646+
}
647+
638648
std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
639649
std::lock_guard<std::mutex>& lh) {
640650
auto resp = nextQueuedItem();
@@ -662,15 +672,15 @@ std::unique_ptr<DcpResponse> ActiveStream::backfillPhase(
662672
// Only DcpResponse objects representing items from "disk" have a size
663673
// so only update backfillRemaining when non-zero
664674
if (resp->getApproximateSize()) {
665-
if (backfillRemaining.load(std::memory_order_relaxed) > 0) {
666-
backfillRemaining.fetch_sub(1, std::memory_order_relaxed);
675+
Expects(backfillRemaining.is_initialized());
676+
if (*backfillRemaining > 0) {
677+
(*backfillRemaining)--;
667678
}
668679
}
669680
}
670681

671682
if (!isBackfillTaskRunning && readyQ.empty()) {
672683
// Given readyQ.empty() is True resp will be NULL
673-
backfillRemaining.store(0, std::memory_order_relaxed);
674684
// The previous backfill has completed. Check to see if another
675685
// backfill needs to be scheduled.
676686
if (pendingBackfill) {
@@ -875,15 +885,24 @@ void ActiveStream::addTakeoverStats(ADD_STAT add_stat, const void *cookie,
875885
return;
876886
}
877887

878-
size_t total = backfillRemaining.load(std::memory_order_relaxed);
888+
size_t total = 0;
889+
const char* status = nullptr;
879890
if (isBackfilling()) {
880-
add_casted_stat("status", "backfilling", add_stat, cookie);
891+
if (backfillRemaining) {
892+
status = "backfilling";
893+
total += *backfillRemaining;
894+
} else {
895+
status = "calculating-item-count";
896+
}
881897
} else {
882-
add_casted_stat("status", "in-memory", add_stat, cookie);
898+
status = "in-memory";
899+
}
900+
add_casted_stat("status", status, add_stat, cookie);
901+
902+
if (backfillRemaining) {
903+
add_casted_stat(
904+
"backfillRemaining", *backfillRemaining, add_stat, cookie);
883905
}
884-
add_casted_stat("backfillRemaining",
885-
backfillRemaining.load(std::memory_order_relaxed),
886-
add_stat, cookie);
887906

888907
size_t vb_items = vb.getNumItems();
889908
size_t chk_items = 0;
@@ -1507,6 +1526,9 @@ void ActiveStream::scheduleBackfill_UNLOCKED(bool reschedule) {
15071526
producer->scheduleBackfillManager(
15081527
*vbucket, shared_from_this(), backfillStart, backfillEnd);
15091528
isBackfillTaskRunning.store(true);
1529+
/// Number of backfill items is unknown until the Backfill task
1530+
/// completes the scan phase - reset backfillRemaining counter.
1531+
backfillRemaining.reset();
15101532
} else {
15111533
if (reschedule) {
15121534
// Infrequent code path, see comment below.

engines/ep/src/dcp/stream.h

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,12 @@ class ActiveStream : public Stream,
250250

251251
void setVBucketStateAckRecieved();
252252

253-
void incrBackfillRemaining(size_t by) {
254-
backfillRemaining.fetch_add(by, std::memory_order_relaxed);
255-
}
253+
/// Set the number of backfill items remaining to the given value.
254+
void setBackfillRemaining(size_t value);
255+
256+
/// Clears the number of backfill items remaining, setting to an empty
257+
/// (unknown) value.
258+
void clearBackfillRemaining();
256259

257260
void markDiskSnapshot(uint64_t startSeqno, uint64_t endSeqno);
258261

@@ -397,12 +400,13 @@ class ActiveStream : public Stream,
397400
snapshotted and put onto readyQ */
398401
AtomicMonotonic<uint64_t, ThrowExceptionPolicy> lastReadSeqno;
399402

400-
/* backfillRemaining is a stat recording the amount of
401-
* items remaining to be read from disk. It is an atomic
402-
* because otherwise the function incrBackfillRemaining
403-
* must acquire the streamMutex lock.
403+
/* backfillRemaining is a stat recording the amount of items remaining to
404+
* be read from disk.
405+
* Before the number of items to be backfilled has been determined (disk
406+
* scanned) it is empty.
407+
* Guarded by streamMutex.
404408
*/
405-
std::atomic<size_t> backfillRemaining;
409+
boost::optional<size_t> backfillRemaining;
406410

407411
std::unique_ptr<DcpResponse> backfillPhase(std::lock_guard<std::mutex>& lh);
408412

engines/ep/tests/ep_test_apis.cc

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,24 +1503,6 @@ void wait_for_stat_to_be_gte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
15031503
}
15041504
}
15051505

1506-
void wait_for_stat_to_be_lte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
1507-
const char *stat, int final,
1508-
const char* stat_key,
1509-
const time_t max_wait_time_in_secs) {
1510-
useconds_t sleepTime = 128;
1511-
WaitTimeAccumulator<int> accumulator("to be less than or equal to", stat,
1512-
stat_key, final,
1513-
max_wait_time_in_secs);
1514-
for (;;) {
1515-
auto current = get_int_stat(h, h1, stat, stat_key);
1516-
if (current <= final) {
1517-
break;
1518-
}
1519-
accumulator.incrementAndAbortIfLimitReached(current, sleepTime);
1520-
decayingSleep(&sleepTime);
1521-
}
1522-
}
1523-
15241506
void wait_for_expired_items_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
15251507
int final,
15261508
const time_t max_wait_time_in_secs) {

engines/ep/tests/ep_test_apis.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,10 +435,15 @@ void wait_for_stat_to_be_gte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
435435
const char *stat, int final,
436436
const char* stat_key = NULL,
437437
const time_t max_wait_time_in_secs = 60);
438-
void wait_for_stat_to_be_lte(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
439-
const char *stat, int final,
438+
439+
template <typename T>
440+
void wait_for_stat_to_be_lte(ENGINE_HANDLE* h,
441+
ENGINE_HANDLE_V1* h1,
442+
const char* stat,
443+
T final,
440444
const char* stat_key = NULL,
441445
const time_t max_wait_time_in_secs = 60);
446+
442447
void wait_for_expired_items_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
443448
int final,
444449
const time_t max_wait_time_in_secs = 60);
@@ -641,6 +646,29 @@ void wait_for_stat_to_be(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
641646
}
642647
}
643648

649+
template <typename T>
650+
void wait_for_stat_to_be_lte(ENGINE_HANDLE* h,
651+
ENGINE_HANDLE_V1* h1,
652+
const char* stat,
653+
T final,
654+
const char* stat_key,
655+
const time_t max_wait_time_in_secs) {
656+
useconds_t sleepTime = 128;
657+
WaitTimeAccumulator<T> accumulator("to be less than or equal to",
658+
stat,
659+
stat_key,
660+
final,
661+
max_wait_time_in_secs);
662+
for (;;) {
663+
auto current = get_stat<T>(h, h1, stat, stat_key);
664+
if (current <= final) {
665+
break;
666+
}
667+
accumulator.incrementAndAbortIfLimitReached(current, sleepTime);
668+
decayingSleep(&sleepTime);
669+
}
670+
}
671+
644672
/**
645673
* Function that does an exponential wait for a 'val' to reach 'expected'
646674
*

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,8 +687,8 @@ ENGINE_ERROR_CODE TestDcpConsumer::openStreams() {
687687
std::stringstream stats_takeover;
688688
stats_takeover << "dcp-vbtakeover " << ctx.vbucket
689689
<< " " << name.c_str();
690-
wait_for_stat_to_be_lte(h, h1, "estimate", static_cast<int>(est),
691-
stats_takeover.str().c_str());
690+
wait_for_stat_to_be_lte(
691+
h, h1, "estimate", est, stats_takeover.str().c_str());
692692
}
693693

694694
if (ctx.flags & DCP_ADD_STREAM_FLAG_DISKONLY) {

engines/ep/tests/mock/mock_stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class MockActiveStream : public ActiveStream {
121121
return lastReadSeqno;
122122
}
123123

124-
int getNumBackfillItemsRemaining() const {
124+
boost::optional<size_t> getNumBackfillItemsRemaining() const {
125125
return backfillRemaining;
126126
}
127127

0 commit comments

Comments
 (0)