Skip to content

Commit ae6deff

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-35656: Don't backfill completed prepares
We don't need to backfill any completed prepares. If we send them to the replica then the replica will just purge them on the next compaction (regardless of the pruge seqno) because they are not needed. Change-Id: I29e746e49ecdb2fe399f2d47dee76545b6bf00da Reviewed-on: http://review.couchbase.org/113710 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 07cbd65 commit ae6deff

File tree

6 files changed

+30
-8
lines changed

6 files changed

+30
-8
lines changed

engines/ep/src/dcp/backfill_disk.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,13 @@ void DiskCallback::callback(GetValue& val) {
124124
throw std::invalid_argument("DiskCallback::callback: val is NULL");
125125
}
126126

127+
// Don't need to send any completed prepares to the replica. The compactor
128+
// would just remove them on it's next run anyway.
129+
if (val.item->isPending() &&
130+
static_cast<uint64_t>(val.item->getBySeqno()) <= highCompletedSeqno) {
131+
return;
132+
}
133+
127134
// MB-26705: Make the backfilled item cold so ideally the consumer would
128135
// evict this before any cached item if they get into memory pressure.
129136
val.item->setNRUValue(MAX_NRU_VALUE);
@@ -247,6 +254,7 @@ backfill_status_t DCPBackfillDisk::create() {
247254
stream->setDead(status);
248255
transitionState(backfill_state_done);
249256
} else {
257+
cb->setHighCompletedSeqno(scanCtx->highCompletedSeqno);
250258
stream->incrBackfillRemaining(scanCtx->documentCount);
251259
stream->markDiskSnapshot(
252260
startSeqno, scanCtx->maxSeqno, scanCtx->highCompletedSeqno);

engines/ep/src/dcp/backfill_disk.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,13 @@ class DiskCallback : public StatusCallback<GetValue> {
6161

6262
void callback(GetValue& val);
6363

64+
void setHighCompletedSeqno(uint64_t seqno) {
65+
highCompletedSeqno = seqno;
66+
};
67+
6468
private:
6569
std::weak_ptr<ActiveStream> streamPtr;
70+
uint64_t highCompletedSeqno;
6671
};
6772

6873
/**

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,15 @@ backfill_status_t DCPBackfillMemoryBuffered::scan() {
299299
// mutated with the HashBucketLock, so get the correct bucket lock
300300
// before calling StoredValue::toItem
301301
auto hbl = evb->ht.getLockedBucket((*rangeItr).getKey());
302+
303+
// Don't need to send any completed prepares, the purging task will
304+
// just remove them on the replica anyway
305+
if ((*rangeItr).getCommitted() ==
306+
CommittedState::PrepareCommitted) {
307+
++rangeItr;
308+
continue;
309+
}
310+
302311
// Ephemeral only supports a durable write level of Majority so
303312
// instead of storing a durability level in our OrderedStoredValues
304313
// we can just assume that all durable writes have the Majority

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -829,8 +829,9 @@ TEST_P(DurabilityActiveStreamTest,
829829
auto& bfm = producer->getBFM();
830830
bfm.backfill();
831831
bfm.backfill();
832-
EXPECT_EQ(3, stream->public_readyQSize());
833-
stream->consumeBackfillItems(3);
832+
// We won't backfill any completed prepares
833+
EXPECT_EQ(2, stream->public_readyQSize());
834+
stream->consumeBackfillItems(2);
834835

835836
EXPECT_EQ(ENGINE_SUCCESS, stream->seqnoAck(replica, 1 /*prepareSeqno*/));
836837

@@ -876,7 +877,7 @@ TEST_P(DurabilityActiveStreamTest, DiskSnapshotSendsHCSWithSyncRepSupport) {
876877
ASSERT_EQ(0, producer->getBytesOutstanding());
877878

878879
// readyQ must contain a SnapshotMarker
879-
ASSERT_EQ(3, stream->public_readyQSize());
880+
ASSERT_EQ(2, stream->public_readyQSize());
880881
auto resp = stream->public_nextQueuedItem();
881882
ASSERT_TRUE(resp);
882883
EXPECT_EQ(DcpResponse::Event::SnapshotMarker, resp->getEvent());

engines/ep/tests/module_tests/dcp_reflection_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ void DCPLoopbackStreamTest::testBackfillAndInMemoryDuplicatePrepares(
645645
// (SNAP_MARKER, PRE, CMT, SET), (SNAP_MARKER, PRE), with a flush after the
646646
// first 4.
647647
route0_1.transferSnapshotMarker(0, 3, MARKER_FLAG_CHK | MARKER_FLAG_DISK);
648-
route0_1.transferMessage(DcpResponse::Event::Prepare);
648+
// We won't backfill and send a completed prepare so first item is CMT.
649649
route0_1.transferMutation(makeStoredDocKey("a"), 2);
650650
route0_1.transferMutation(makeStoredDocKey("b"), 3);
651651

engines/ep/tests/module_tests/dcp_stream_sync_repl_test.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ void DcpStreamSyncReplTest::testBackfillPrepareCommit(
564564
// Wait for the backfill task to have pushed all items to the Stream::readyQ
565565
// Note: we expect 1 SnapshotMarker + numItems in the readyQ
566566
std::chrono::microseconds uSleepTime(128);
567-
while (stream->public_readyQSize() < 1 + 2) {
567+
while (stream->public_readyQSize() < 1 + 1) {
568568
uSleepTime = decayingSleep(uSleepTime);
569569
}
570570

@@ -574,9 +574,8 @@ void DcpStreamSyncReplTest::testBackfillPrepareCommit(
574574
EXPECT_EQ(0, dcpSnapMarker.getStartSeqno());
575575
EXPECT_EQ(2, dcpSnapMarker.getEndSeqno());
576576

577-
item = stream->public_nextQueuedItem();
578-
verifyDcpPrepare(*prepared, *item);
579-
577+
// We won't backfill a completed prepare as the replica would just compact
578+
// it away.
580579
item = stream->public_nextQueuedItem();
581580
// In general, a backfill from disk will send a mutation instead of a
582581
// commit as we may have de-duped the preceding prepare. The only case where

0 commit comments

Comments
 (0)