Skip to content

Commit c25b912

Browse files
committed
MB-34173: 1/2 Persist a valid snapshot range during backfill
If a backfill was very large and the backfill queue had grown to be > flusher_batch_split_trigger before the flusher visits the vb, the code path taken results in an unitialised snapshot_range_t being persisted. To address this we ensure the range is correctly initialised in the path where the backfill queue is > flusher_batch_split_trigger Change-Id: Idcdbe9fa9c7f3807d08372e3ebf24c106a449d9f Reviewed-on: http://review.couchbase.org/109269 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 4e8f55f commit c25b912

File tree

9 files changed

+126
-26
lines changed

9 files changed

+126
-26
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,16 +1272,15 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
12721272
"manager on vb:%" PRIu16,
12731273
name.c_str(),
12741274
vbucketId);
1275-
return {};
1275+
return {0, 0};
12761276
}
12771277

12781278
auto& cursor = it->second;
12791279

12801280
// Fetch whole checkpoints; as long as we don't exceed the approx item
12811281
// limit.
1282-
ItemsForCursor result;
1283-
result.range.start = (*cursor.currentCheckpoint)->getSnapshotStartSeqno();
1284-
result.range.end = (*cursor.currentCheckpoint)->getSnapshotEndSeqno();
1282+
ItemsForCursor result((*cursor.currentCheckpoint)->getSnapshotStartSeqno(),
1283+
(*cursor.currentCheckpoint)->getSnapshotEndSeqno());
12851284
size_t itemCount = 0;
12861285
while ((result.moreAvailable = incrCursor(cursor))) {
12871286
queued_item& qi = *(cursor.currentPos);
@@ -1627,10 +1626,9 @@ snapshot_info_t CheckpointManager::getSnapshotInfo() {
16271626
"checkpointList is empty");
16281627
}
16291628

1630-
snapshot_info_t info;
1631-
info.range.start = checkpointList.back()->getSnapshotStartSeqno();
1632-
info.start = lastBySeqno;
1633-
info.range.end = checkpointList.back()->getSnapshotEndSeqno();
1629+
snapshot_info_t info(lastBySeqno,
1630+
{checkpointList.back()->getSnapshotStartSeqno(),
1631+
checkpointList.back()->getSnapshotEndSeqno()});
16341632

16351633
// If there are no items in the open checkpoint then we need to resume by
16361634
// using that sequence numbers of the last closed snapshot. The exception is

engines/ep/src/checkpoint.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,9 @@ class CheckpointManager {
628628

629629
/// Return type of getItemsForCursor()
630630
struct ItemsForCursor {
631-
snapshot_range_t range = {0, 0};
631+
ItemsForCursor(uint64_t start, uint64_t end) : range(start, end) {
632+
}
633+
snapshot_range_t range;
632634
bool moreAvailable = {false};
633635
};
634636

engines/ep/src/ep_types.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,22 @@ std::string to_string(GenerateBySeqno generateBySeqno);
4848
std::string to_string(GenerateCas generateCas);
4949
std::string to_string(TrackCasDrift trackCasDrift);
5050

51-
typedef struct {
51+
struct snapshot_range_t {
52+
snapshot_range_t(uint64_t start, uint64_t end) : start(start), end(end) {
53+
// @todo: enforce start < end in a non-maintenance release
54+
}
55+
5256
uint64_t start;
5357
uint64_t end;
54-
} snapshot_range_t;
58+
};
5559

56-
typedef struct {
60+
struct snapshot_info_t {
61+
snapshot_info_t(uint64_t start, snapshot_range_t range)
62+
: start(start), range(range) {
63+
}
5764
uint64_t start;
5865
snapshot_range_t range;
59-
} snapshot_info_t;
66+
};
6067

6168
/**
6269
* The following options can be specified

engines/ep/src/vbucket.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -343,25 +343,27 @@ VBucket::ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
343343
// Note that it is only valid to queue a complete checkpoint - this is where
344344
// the "approx" in the limit comes from.
345345
const auto ckptMgrLimit = approxLimit - result.items.size();
346-
CheckpointManager::ItemsForCursor ckptItems;
346+
bool ckptItemsAvailable = true;
347347
if (ckptMgrLimit > 0) {
348348
auto _begin_ = ProcessClock::now();
349-
ckptItems = checkpointManager->getItemsForCursor(
349+
auto ckptItems = checkpointManager->getItemsForCursor(
350350
CheckpointManager::pCursorName, result.items, ckptMgrLimit);
351351
result.range = ckptItems.range;
352+
ckptItemsAvailable = ckptItems.moreAvailable;
352353
stats.persistenceCursorGetItemsHisto.add(
353354
std::chrono::duration_cast<std::chrono::microseconds>(
354355
ProcessClock::now() - _begin_));
355356
} else {
356357
// We haven't got sufficient remaining capacity to read items from
357358
// CheckpoitnManager, therefore we must assume that there /could/
358-
// more data to follow.
359-
ckptItems.moreAvailable = true;
359+
// more data to follow (leaving ckptItemsAvailable true). We also must
360+
// ensure the valid snapshot range is returned
361+
result.range = checkpointManager->getSnapshotInfo().range;
360362
}
361363

362364
// Check if there's any more items remaining.
363365
result.moreAvailable =
364-
!rejectQueue.empty() || !backfillEmpty || ckptItems.moreAvailable;
366+
!rejectQueue.empty() || !backfillEmpty || ckptItemsAvailable;
365367

366368
return result;
367369
}

engines/ep/src/vbucket.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ class VBucket : public std::enable_shared_from_this<VBucket> {
418418

419419
struct ItemsToFlush {
420420
std::vector<queued_item> items;
421-
snapshot_range_t range;
421+
snapshot_range_t range{0, 0};
422422
bool moreAvailable = false;
423423
};
424424

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4521,6 +4521,79 @@ static enum test_result test_dcp_replica_stream_backfill(ENGINE_HANDLE *h,
45214521
return SUCCESS;
45224522
}
45234523

4524+
// Test generates a replica VB and splits the generation with a warmup.
4525+
// Importantly the very first batch of DCP items are marked as 'backfill' and
4526+
// the test requires that the flusher_batch_split_trigger setting is less than
4527+
// the size of the first batch.
4528+
static enum test_result test_dcp_replica_stream_backfill_MB_34173(
4529+
ENGINE_HANDLE* h, ENGINE_HANDLE_V1* h1) {
4530+
check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
4531+
"Failed to set vbucket state.");
4532+
const int items = 100;
4533+
4534+
// Validate that the flusher will split the items
4535+
checkgt(items,
4536+
get_int_stat(h, h1, "ep_flusher_batch_split_trigger", "config"),
4537+
"flusher_batch_split_trigger must be less than items");
4538+
4539+
const void *cookie = testHarness.create_cookie();
4540+
uint32_t opaque = 0xFFFF0000;
4541+
uint32_t seqno = 0;
4542+
uint32_t flags = 0;
4543+
const char* name = "MB_34173";
4544+
4545+
checkeq(ENGINE_SUCCESS,
4546+
h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
4547+
"Failed dcp producer open connection.");
4548+
std::string type = get_str_stat(h, h1, "eq_dcpq:MB_34173:type", "dcp");
4549+
checkeq(0, type.compare("consumer"), "Consumer not found");
4550+
4551+
opaque = add_stream_for_consumer(h, h1, cookie, opaque, 0, 0,
4552+
PROTOCOL_BINARY_RESPONSE_SUCCESS);
4553+
// backfill items 1 to 100
4554+
dcp_stream_to_replica(h, h1, cookie, opaque, 0, 0x02, 1, items, 1, items);
4555+
wait_for_flusher_to_settle(h, h1);
4556+
wait_for_stat_to_be(h, h1, "vb_0:high_seqno", items, "vbucket-seqno");
4557+
4558+
testHarness.destroy_cookie(cookie);
4559+
4560+
testHarness.reload_engine(&h,
4561+
&h1,
4562+
testHarness.engine_path,
4563+
testHarness.get_current_testcase()->cfg,
4564+
true,
4565+
true);
4566+
wait_for_warmup_complete(h, h1);
4567+
4568+
cookie = testHarness.create_cookie();
4569+
opaque = 0xFFFF0000;
4570+
checkeq(ENGINE_SUCCESS,
4571+
h1->dcp.open(h, cookie, opaque, seqno, flags, name, {}),
4572+
"Failed dcp producer open connection.");
4573+
4574+
type = get_str_stat(h, h1, "eq_dcpq:MB_34173:type", "dcp");
4575+
checkeq(0, type.compare("consumer"), "Consumer not found");
4576+
4577+
opaque = add_stream_for_consumer(
4578+
h, h1, cookie, opaque, 0, 0, PROTOCOL_BINARY_RESPONSE_SUCCESS);
4579+
4580+
// A second batch could fail if MB-34173 is not fixed, I say could because
4581+
// the corruption of the snapshot range may not yield a failure...
4582+
dcp_stream_to_replica(h,
4583+
h1,
4584+
cookie,
4585+
opaque,
4586+
0,
4587+
0x02,
4588+
items + 1,
4589+
items + 10,
4590+
items + 1,
4591+
items + 10);
4592+
4593+
testHarness.destroy_cookie(cookie);
4594+
return SUCCESS;
4595+
}
4596+
45244597
static enum test_result test_dcp_replica_stream_in_memory(ENGINE_HANDLE *h,
45254598
ENGINE_HANDLE_V1 *h1)
45264599
{
@@ -6083,6 +6156,12 @@ BaseTestCase testsuite_testcases[] = {
60836156
"chk_remover_stime=1;max_checkpoints=2",
60846157
prepare,
60856158
cleanup),
6159+
TestCase("test dcp replica stream backfill and warmup (MB-34173)",
6160+
test_dcp_replica_stream_backfill_MB_34173, test_setup, teardown,
6161+
"chk_remover_stime=1;max_checkpoints=2;"
6162+
"flusher_batch_split_trigger=10",
6163+
prepare_ep_bucket,
6164+
cleanup),
60866165
TestCase("test dcp replica stream in-memory",
60876166
test_dcp_replica_stream_in_memory, test_setup, teardown,
60886167
"chk_remover_stime=1;max_checkpoints=2", prepare, cleanup),

engines/ep/tests/module_tests/checkpoint_test.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,9 @@ TYPED_TEST(CheckpointTest, ItemBasedCheckpointCreation) {
566566
// moves the single cursor registered outside of the initial checkpoint,
567567
// allowing a new open checkpoint to be created.
568568
EXPECT_EQ(1, this->manager->getNumOfCursors());
569-
snapshot_range_t range;
570569
std::vector<queued_item> items;
571-
range = this->manager->getAllItemsForCursor(CheckpointManager::pCursorName,
572-
items);
570+
snapshot_range_t range = this->manager->getAllItemsForCursor(
571+
CheckpointManager::pCursorName, items);
573572

574573
EXPECT_EQ(0, range.start);
575574
EXPECT_EQ(1021, range.end);

engines/ep/tests/module_tests/vbucket_test.cc

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ VBucketTestBase::VBucketTestBase(item_eviction_policy_t eviction_policy) {
6161
global_stats,
6262
checkpoint_config,
6363
/*kvshard*/ nullptr,
64-
/*lastSeqno*/ 1000,
65-
/*lastSnapStart*/ 0,
66-
/*lastSnapEnd*/ 0,
64+
lastSeqno,
65+
range.start,
66+
range.end,
6767
/*table*/ nullptr,
6868
std::make_shared<DummyCB>(),
6969
/*newSeqnoCb*/ nullptr,
@@ -571,6 +571,8 @@ TEST_P(VBucketTest, GetItemsForCursor_Limit) {
571571
EXPECT_STREQ("1", result.items[1]->getKey().c_str());
572572
EXPECT_STREQ("2", result.items[2]->getKey().c_str());
573573
EXPECT_TRUE(result.items[3]->isCheckPointMetaItem());
574+
EXPECT_EQ(range.start, result.range.start);
575+
EXPECT_EQ(range.end + 2, result.range.end);
574576

575577
// Asking for 5 items should give us all items in second checkpoint and
576578
// third checkpoint - 7 total
@@ -585,6 +587,8 @@ TEST_P(VBucketTest, GetItemsForCursor_Limit) {
585587
EXPECT_TRUE(result.items[4]->isCheckPointMetaItem());
586588
EXPECT_STREQ("5", result.items[5]->getKey().c_str());
587589
EXPECT_STREQ("6", result.items[6]->getKey().c_str());
590+
EXPECT_EQ(range.end + 2, result.range.start);
591+
EXPECT_EQ(range.end + 6, result.range.end);
588592
}
589593

590594
// Check that getItemsToPersist() can correctly impose a limit on items fetched.
@@ -600,7 +604,8 @@ TEST_P(VBucketTest, GetItemsToPersist_Limit) {
600604

601605
// Add 2 items to checkpoint manager (in addition to initial
602606
// checkpoint_start).
603-
auto keys = generateKeys(2, 5);
607+
const int itemsToGenerate = 2;
608+
auto keys = generateKeys(itemsToGenerate, 5);
604609
setMany(keys, MutationStatus::WasClean);
605610

606611
// Test - fetch items in chunks spanning the different item sources.
@@ -610,12 +615,16 @@ TEST_P(VBucketTest, GetItemsToPersist_Limit) {
610615
EXPECT_TRUE(result.moreAvailable);
611616
EXPECT_EQ(1, result.items.size());
612617
EXPECT_STREQ("1", result.items[0]->getKey().c_str());
618+
EXPECT_EQ(range.start, result.range.start);
619+
EXPECT_EQ(range.end + itemsToGenerate, result.range.end);
613620

614621
result = this->vbucket->getItemsToPersist(2);
615622
EXPECT_TRUE(result.moreAvailable);
616623
EXPECT_EQ(2, result.items.size());
617624
EXPECT_STREQ("2", result.items[0]->getKey().c_str());
618625
EXPECT_STREQ("3", result.items[1]->getKey().c_str());
626+
EXPECT_EQ(range.start, result.range.start);
627+
EXPECT_EQ(range.end + itemsToGenerate, result.range.end);
619628

620629
// Next call should read 1 item from backfill; and *all* items from
621630
// checkpoint (even through we only asked for 2 total), as it is not valid
@@ -627,6 +636,8 @@ TEST_P(VBucketTest, GetItemsToPersist_Limit) {
627636
EXPECT_TRUE(result.items[1]->isCheckPointMetaItem());
628637
EXPECT_STREQ("5", result.items[2]->getKey().c_str());
629638
EXPECT_STREQ("6", result.items[3]->getKey().c_str());
639+
EXPECT_EQ(range.start, result.range.start);
640+
EXPECT_EQ(range.end + itemsToGenerate, result.range.end);
630641
}
631642

632643
// Check that getItemsToPersist() correctly returns `moreAvailable` if we

engines/ep/tests/module_tests/vbucket_test.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,7 @@ class VBucketTestBase {
9898
CheckpointConfig checkpoint_config;
9999
Configuration config;
100100
const void* cookie = {};
101+
const uint64_t lastSeqno{1000};
102+
const snapshot_range_t range{5, lastSeqno};
101103
};
102104

0 commit comments

Comments
 (0)