Skip to content

Commit e462ee0

Browse files
paolococchidaverigby
authored andcommitted
MB-38444: Set Backfill SnapEndSeqno to SeqList HighSeqno
This patch fixes an inconsistency between the EndSeqno and MaxVisibleSeqno that we send in SnapMarker at Backfill in Ephemeral. That is, we may end up breaking the (MVS <= EndSeqno) invariant, which could cause: 1) a wrong MVS > EndSeqno sent in the marker 2) a wrong MVS and EndSeqno (1) may happen when the connection supports SyncReplication, while (2) when the connection doesn't support SR. *Details* There is a substantial difference in how we create the Backfill Range between Persistent (where everything works fine) and Ephemeral. That is, for Persistence we pick up the the full range of the on-disk seqno-index (ie, all data indexed in the latest on-disk header). By doing that, EndSeqno and MVS are consistent. While in Ephemeral we set the EndSeqno of the Backfill Range based on what we have in the CheckpointManager. For making it simple, apart from some exceptions that is (EndSeqno = FirstCkptSeqno - 1). Then we consider the MVS from the Ephemeral storage (ie, the SeqList), which in general is not consistent with the computed EndSeqno. With this patch Ephemeral's behaviour matches the one in Persistent, ie Backfill picks up all data in the storage. Which makes the existing usage of MVS correct. Change-Id: I4c504b8e161f1d5463757414e752a77c885fe25e Reviewed-on: http://review.couchbase.org/c/kv_engine/+/144019 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 5f9e3d3 commit e462ee0

File tree

3 files changed

+126
-28
lines changed

3 files changed

+126
-28
lines changed

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -152,34 +152,20 @@ backfill_status_t DCPBackfillMemoryBuffered::create() {
152152
remaining count */
153153
while (rangeItr.curr() != rangeItr.end()) {
154154
if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
155-
/* Determine the endSeqno of the current snapshot.
156-
We want to send till requested endSeqno, but if that cannot
157-
constitute a snapshot then we need to send till the point
158-
which can be called as snapshot end */
159-
endSeqno = std::max(
160-
endSeqno,
161-
static_cast<uint64_t>(rangeItr.getEarlySnapShotEnd()));
162-
163-
/* We want to send items only till the point it is necessary to do
164-
so */
165-
endSeqno =
166-
std::min(endSeqno, static_cast<uint64_t>(rangeItr.back()));
167-
168-
/* Mark disk snapshot */
155+
// Backfill covers the full SeqList range.
156+
endSeqno = rangeItr.back();
157+
158+
// Send SnapMarker
169159
bool markerSent =
170160
stream->markDiskSnapshot(startSeqno,
171161
endSeqno,
172162
evb->getHighCompletedSeqno(),
173163
rangeItr.getMaxVisibleSeqno());
174164

175165
if (markerSent) {
176-
/* Set backfill remaining
177-
[EPHE TODO]: This will be inaccurate if do not backfill till
178-
end of the iterator Additionally, this value may be an
179-
overestimate even if backfilled to the iterator end - it
180-
includes prepares/aborts which will not be sent if the stream
181-
is not sync write aware
182-
*/
166+
// @todo: This value may be an overestimate, as it includes
167+
// prepares/aborts which will not be sent if the stream is not
168+
// sync write aware
183169
stream->setBackfillRemaining(rangeItr.count());
184170

185171
/* Change the backfill state and return for next stage. */

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2362,19 +2362,17 @@ static enum test_result test_dcp_producer_stream_req_full(EngineIface* h) {
23622362

23632363
checkne(num_items - get_stat<uint64_t>(h, "ep_items_rm_from_checkpoints"),
23642364
uint64_t{0},
2365-
"Require a non-zero number of items to still be present in "
2366-
"CheckpointManager to be able to get 2x snapshot markers "
2367-
"(1x disk, 1x memory)");
2365+
"Ensure a non-zero number of items to still be present in "
2366+
"CheckpointManager to verify that we still get all mutations in the"
2367+
" storage in a single Backfill snapshot");
23682368

23692369
const void* cookie = testHarness->create_cookie();
23702370

23712371
DcpStreamCtx ctx;
23722372
ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
23732373
ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
23742374
ctx.exp_mutations = num_items;
2375-
/* Memory backfill sends items from checkpoint snapshots as much as possible
2376-
Relies on backfill only when checkpoint snapshot is cleaned up */
2377-
ctx.exp_markers = 2;
2375+
ctx.exp_markers = 1;
23782376

23792377
TestDcpConsumer tdc("unittest", cookie, h);
23802378
tdc.addStreamCtx(ctx);
@@ -2512,7 +2510,14 @@ static enum test_result test_dcp_producer_stream_req_backfill(EngineIface* h) {
25122510
DcpStreamCtx ctx;
25132511
ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
25142512
ctx.seqno = {0, 200};
2515-
ctx.exp_mutations = 200;
2513+
// The idea here is that at backfill we get the full Disk/SeqList snapshot.
2514+
// Persistence has been stopped at seqno 200, while Ephemeral stores all
2515+
// seqnos in the SeqList.
2516+
if (isEphemeralBucket(h)) {
2517+
ctx.exp_mutations = 400;
2518+
} else {
2519+
ctx.exp_mutations = 200;
2520+
}
25162521
ctx.exp_markers = 1;
25172522

25182523
TestDcpConsumer tdc("unittest", cookie, h);

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4032,6 +4032,113 @@ TEST_P(SingleThreadedPassiveStreamTest, MB42780_DiskToMemoryFromPre65) {
40324032
ASSERT_EQ(4, manager.getHighSeqno());
40334033
}
40344034

4035+
/**
4036+
* MB-38444: We fix an Ephemeral-only bug, but test covers Persistent bucket too
4037+
*/
4038+
TEST_P(SingleThreadedActiveStreamTest, BackfillRangeCoversAllDataInTheStorage) {
4039+
// We need to re-create the stream in a condition that triggers a backfill
4040+
stream.reset();
4041+
producer.reset();
4042+
4043+
auto& vb = *engine->getVBucket(vbid);
4044+
ASSERT_EQ(0, vb.getHighSeqno());
4045+
auto& manager = *vb.checkpointManager;
4046+
const auto& list =
4047+
CheckpointManagerTestIntrospector::public_getCheckpointList(
4048+
manager);
4049+
ASSERT_EQ(1, list.size());
4050+
4051+
const auto keyA = makeStoredDocKey("keyA");
4052+
const std::string value = "value";
4053+
store_item(vbid, keyA, value);
4054+
EXPECT_EQ(1, vb.getHighSeqno());
4055+
EXPECT_EQ(1, vb.getMaxVisibleSeqno());
4056+
const auto keyB = makeStoredDocKey("keyB");
4057+
store_item(vbid, keyB, value);
4058+
EXPECT_EQ(2, vb.getHighSeqno());
4059+
EXPECT_EQ(2, vb.getMaxVisibleSeqno());
4060+
4061+
// Steps to ensure backfill when we re-create the stream in the following
4062+
manager.createNewCheckpoint();
4063+
flushVBucketToDiskIfPersistent(vbid, 2 /*expected_num_flushed*/);
4064+
ASSERT_EQ(2, list.size());
4065+
bool newCkptCreated;
4066+
EXPECT_EQ(2, manager.removeClosedUnrefCheckpoints(vb, newCkptCreated));
4067+
EXPECT_FALSE(newCkptCreated);
4068+
ASSERT_EQ(1, list.size());
4069+
ASSERT_EQ(0, manager.getNumOpenChkItems());
4070+
4071+
// Move high-seqno to 4
4072+
const auto keyC = makeStoredDocKey("keyC");
4073+
store_item(vbid, keyC, value);
4074+
EXPECT_EQ(3, vb.getHighSeqno());
4075+
EXPECT_EQ(3, vb.getMaxVisibleSeqno());
4076+
const auto keyD = makeStoredDocKey("keyD");
4077+
store_item(vbid, keyD, value);
4078+
EXPECT_EQ(4, vb.getHighSeqno());
4079+
EXPECT_EQ(4, vb.getMaxVisibleSeqno());
4080+
4081+
// At this point we havehigh-seqno=4 but only seqnos 3 and 4 in the CM, so
4082+
// we'll backfill.
4083+
4084+
// Note: The aim here is to verify that Backfill picks up everything from
4085+
// the storage even in the case where some seqnos are in the CM. So, we need
4086+
// to ensure that all seqnos are on-disk for Persistent.
4087+
flushVBucketToDiskIfPersistent(vbid, 2 /*expected_num_flushed*/);
4088+
4089+
// Re-create producer and stream
4090+
recreateProducerAndStream(vb, 0 /*flags*/);
4091+
ASSERT_TRUE(producer);
4092+
producer->createCheckpointProcessorTask();
4093+
ASSERT_TRUE(stream);
4094+
ASSERT_TRUE(stream->isBackfilling());
4095+
ASSERT_TRUE(stream->public_supportSyncReplication());
4096+
auto resp = stream->next();
4097+
EXPECT_FALSE(resp);
4098+
4099+
// Drive the backfill - execute
4100+
auto& bfm = producer->getBFM();
4101+
ASSERT_EQ(1, bfm.getNumBackfills());
4102+
4103+
// Backfill::create
4104+
// Before the fix this steps generates SnapMarker{start:0, end:2, mvs:4},
4105+
// while we want SnapMarker{start:0, end:4, mvs:4}
4106+
ASSERT_EQ(backfill_success, bfm.backfill());
4107+
const auto& readyQ = stream->public_readyQ();
4108+
ASSERT_EQ(1, readyQ.size());
4109+
resp = stream->next();
4110+
ASSERT_TRUE(resp);
4111+
EXPECT_EQ(DcpResponse::Event::SnapshotMarker, resp->getEvent());
4112+
auto snapMarker = dynamic_cast<SnapshotMarker&>(*resp);
4113+
EXPECT_EQ(0, snapMarker.getStartSeqno());
4114+
EXPECT_EQ(4, snapMarker.getEndSeqno());
4115+
EXPECT_EQ(4, *snapMarker.getMaxVisibleSeqno());
4116+
4117+
// Verify that all seqnos are sent at Backfill::scan
4118+
ASSERT_EQ(backfill_success, bfm.backfill());
4119+
ASSERT_EQ(4, readyQ.size());
4120+
resp = stream->next();
4121+
ASSERT_TRUE(resp);
4122+
EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
4123+
EXPECT_EQ(1, *resp->getBySeqno());
4124+
ASSERT_EQ(3, readyQ.size());
4125+
resp = stream->next();
4126+
ASSERT_TRUE(resp);
4127+
EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
4128+
EXPECT_EQ(2, *resp->getBySeqno());
4129+
ASSERT_EQ(2, readyQ.size());
4130+
resp = stream->next();
4131+
ASSERT_TRUE(resp);
4132+
EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
4133+
EXPECT_EQ(3, *resp->getBySeqno());
4134+
ASSERT_EQ(1, readyQ.size());
4135+
resp = stream->next();
4136+
ASSERT_TRUE(resp);
4137+
EXPECT_EQ(DcpResponse::Event::Mutation, resp->getEvent());
4138+
EXPECT_EQ(4, *resp->getBySeqno());
4139+
ASSERT_EQ(0, readyQ.size());
4140+
}
4141+
40354142
INSTANTIATE_TEST_CASE_P(AllBucketTypes,
40364143
SingleThreadedActiveStreamTest,
40374144
STParameterizedBucketTest::allConfigValues(),

0 commit comments

Comments
 (0)