Skip to content

Commit 0e66297

Browse files
committed
MB-54850: Skip magma key-lookup optimization if Disk snap is Historical
As an optimization for improving the performance at flush, we tell magma whether a replica is receiving the initial disk snapshot, ie the vbucket is empty. In that case magma can skip the key lookups required for maintaining the item-count, as any mutation received is an Insert by logic. That isn't the case anymore for when the initial disk snapshot is Historical, as the snapshot might contain duplicates. Change-Id: I453339fd9dcbe9cbd4dcd8f4c54a18cd8543d5ca Reviewed-on: https://review.couchbase.org/c/kv_engine/+/184967 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]>
1 parent 9cb371e commit 0e66297

File tree

10 files changed

+232
-25
lines changed

10 files changed

+232
-25
lines changed

engines/ep/src/checkpoint.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,10 @@ class Checkpoint {
434434
checkpointType = type;
435435
}
436436

437+
void setHistorical(CheckpointHistorical hist) {
438+
historical = hist;
439+
}
440+
437441
void setHighCompletedSeqno(std::optional<uint64_t> seqno) {
438442
highCompletedSeqno = seqno;
439443
}
@@ -629,6 +633,10 @@ class Checkpoint {
629633
*/
630634
void applyQueuedItemsMemUsageDecrement(size_t size);
631635

636+
CheckpointHistorical getHistorical() const {
637+
return historical;
638+
}
639+
632640
// Memory overhead of the toWrite container (a list), ie 3 ptrs (forward,
633641
// backwards and element pointers) per element in the list.
634642
static constexpr uint8_t per_item_queue_overhead = 3 * sizeof(uintptr_t);
@@ -764,7 +772,7 @@ class Checkpoint {
764772

765773
// Whether the snapshot stored in this checkpoint is part of a historical
766774
// sequence of mutation.
767-
const CheckpointHistorical historical;
775+
CheckpointHistorical historical;
768776

769777
friend std::ostream& operator <<(std::ostream& os, const Checkpoint& m);
770778
};

engines/ep/src/checkpoint_manager.cc

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ CheckpointType CheckpointManager::getOpenCheckpointType() const {
106106
return getOpenCheckpoint(lh).getCheckpointType();
107107
}
108108

109+
CheckpointHistorical CheckpointManager::getOpenCheckpointHistorical() const {
110+
std::lock_guard<std::mutex> lh(queueLock);
111+
return getOpenCheckpoint(lh).getHistorical();
112+
}
113+
109114
Checkpoint& CheckpointManager::getOpenCheckpoint(
110115
const std::lock_guard<std::mutex>&) const {
111116
// During its lifetime, the checkpointList can only be in one of the
@@ -966,11 +971,12 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
966971

967972
// Fetch whole checkpoints; as long as we don't exceed the approx item
968973
// limit.
969-
ItemsForCursor result(
970-
(*cursor.getCheckpoint())->getCheckpointType(),
971-
(*cursor.getCheckpoint())->getMaxDeletedRevSeqno(),
972-
(*cursor.getCheckpoint())->getHighCompletedSeqno(),
973-
(*cursor.getCheckpoint())->getVisibleSnapshotEndSeqno());
974+
const auto& checkpoint = **cursor.getCheckpoint();
975+
ItemsForCursor result(checkpoint.getCheckpointType(),
976+
checkpoint.getMaxDeletedRevSeqno(),
977+
checkpoint.getHighCompletedSeqno(),
978+
checkpoint.getVisibleSnapshotEndSeqno(),
979+
checkpoint.getHistorical());
974980

975981
// Only enforce a hard limit for Disk Checkpoints (i.e backfill). This will
976982
// prevent huge memory growth due to flushing vBuckets on replicas during a
@@ -1309,6 +1315,7 @@ void CheckpointManager::createSnapshot(
13091315
openCkpt.setSnapshotStartSeqno(snapStartSeqno);
13101316
openCkpt.setSnapshotEndSeqno(snapEndSeqno, visibleSnapEnd);
13111317
openCkpt.setCheckpointType(checkpointType);
1318+
openCkpt.setHistorical(historical);
13121319
openCkpt.setHighCompletedSeqno(highCompletedSeqno);
13131320
return;
13141321
}

engines/ep/src/checkpoint_manager.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,10 @@ class CheckpointManager {
8383
ItemsForCursor(CheckpointType checkpointType,
8484
std::optional<uint64_t> maxDeletedRevSeqno,
8585
std::optional<uint64_t> highCompletedSeqno,
86-
uint64_t visibleSeqno)
86+
uint64_t visibleSeqno,
87+
CheckpointHistorical historical)
8788
: checkpointType(checkpointType),
89+
historical(historical),
8890
maxDeletedRevSeqno(maxDeletedRevSeqno),
8991
highCompletedSeqno(highCompletedSeqno),
9092
visibleSeqno(visibleSeqno) {
@@ -99,6 +101,8 @@ class CheckpointManager {
99101
*/
100102
CheckpointType checkpointType = CheckpointType::Memory;
101103

104+
CheckpointHistorical historical = CheckpointHistorical::No;
105+
102106
std::optional<uint64_t> maxDeletedRevSeqno = {};
103107

104108
/**
@@ -180,6 +184,8 @@ class CheckpointManager {
180184

181185
CheckpointType getOpenCheckpointType() const;
182186

187+
CheckpointHistorical getOpenCheckpointHistorical() const;
188+
183189
/**
184190
* Removes closed unreferenced checkpoints from the checkpoint-list and
185191
* frees up their used memory.

engines/ep/src/ep_bucket.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,8 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
483483
// KVStore implementations since a lookup isn't needed. This scenario
484484
// commonly occurs in rebalance where a vbucket is taken over by a new node.
485485
if (toFlush.ranges.size() == 1 &&
486-
toFlush.checkpointType == CheckpointType::InitialDisk) {
486+
toFlush.checkpointType == CheckpointType::InitialDisk &&
487+
toFlush.historical == CheckpointHistorical::No) {
487488
writeOp = WriteOperation::Insert;
488489
}
489490

engines/ep/src/vbucket.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ ItemsToFlush VBucket::getItemsToPersist(size_t approxLimit) {
442442
result.ranges = std::move(rangeInfo.ranges);
443443
result.maxDeletedRevSeqno = rangeInfo.maxDeletedRevSeqno;
444444
result.checkpointType = rangeInfo.checkpointType;
445+
result.historical = rangeInfo.historical;
445446
result.flushHandle = std::move(rangeInfo.flushHandle);
446447
result.moreAvailable = rangeInfo.moreAvailable;
447448

engines/ep/src/vbucket_types.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ struct ItemsToFlush {
7474
std::optional<uint64_t> maxDeletedRevSeqno = {};
7575
CheckpointType checkpointType = CheckpointType::Memory;
7676

77+
CheckpointHistorical historical = CheckpointHistorical::No;
78+
7779
// See CM::ItemsForCursor for details.
7880
UniqueFlushHandle flushHandle;
7981
};

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 150 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "checkpoint_manager.h"
1515
#include "checkpoint_utils.h"
1616
#include "collections/collections_test_helpers.h"
17+
#include "collections/events_generated.h"
1718
#include "collections/vbucket_manifest_handles.h"
1819
#include "dcp/backfill_disk.h"
1920
#include "dcp/response.h"
@@ -5242,45 +5243,177 @@ INSTANTIATE_TEST_SUITE_P(Persistent,
52425243
STParameterizedBucketTest::magmaConfigValues(),
52435244
STParameterizedBucketTest::PrintToStringParamName);
52445245

5245-
TEST_P(CDCPassiveStreamTest, HistorySnapshotReceived) {
5246-
// Replica receives Snap{1, 3, Disk|History}, with 1->3 mutations of the
5247-
// same key.
5248-
// The test verifies that replica is resilient to duplicates in the disk
5249-
// snapshot and that duplicates are successfully queued into the same
5250-
// checkpoint.
5246+
void CDCPassiveStreamTest::SetUp() {
5247+
// Note: Checkpoint removal isn't under test at all here.
5248+
// Eager checkpoint removal, default prod setting in Neo and post-Neo.
5249+
// That helps in cleaning up the CheckpointManager during the test and
5250+
// we won't need to fix the testsuite when merging into the master
5251+
// branch.
5252+
if (!config_string.empty()) {
5253+
config_string += ";";
5254+
}
5255+
config_string += "checkpoint_removal_mode=eager";
5256+
5257+
STPassiveStreamPersistentTest::SetUp();
5258+
}
5259+
5260+
void CDCPassiveStreamTest::createHistoricalCollection(uint64_t snapStart,
5261+
uint64_t snapEnd) {
5262+
const uint32_t opaque = 1;
5263+
ASSERT_EQ(cb::engine_errc::success,
5264+
consumer->snapshotMarker(
5265+
opaque,
5266+
vbid,
5267+
snapStart,
5268+
snapEnd,
5269+
MARKER_FLAG_DISK | MARKER_FLAG_CHK | MARKER_FLAG_HISTORY,
5270+
{0},
5271+
{}));
5272+
5273+
flatbuffers::FlatBufferBuilder fb;
5274+
Collections::ManifestUid manifestUid;
5275+
const auto collection = CollectionEntry::historical;
5276+
auto fbPayload = Collections::VB::CreateCollection(
5277+
fb,
5278+
manifestUid,
5279+
uint32_t(ScopeEntry::defaultS.getId()),
5280+
uint32_t(collection.getId()),
5281+
false,
5282+
0,
5283+
fb.CreateString(collection.name.data(), collection.name.size()),
5284+
true /*history*/);
5285+
fb.Finish(fbPayload);
5286+
ASSERT_EQ(cb::engine_errc::success,
5287+
consumer->systemEvent(
5288+
1 /*opaque*/,
5289+
vbid,
5290+
mcbp::systemevent::id::CreateCollection,
5291+
1,
5292+
mcbp::systemevent::version::version2,
5293+
{reinterpret_cast<const uint8_t*>(collection.name.data()),
5294+
collection.name.size()},
5295+
{fb.GetBufferPointer(), fb.GetSize()}));
52515296

52525297
const auto& vb = *store->getVBucket(vbid);
5253-
ASSERT_EQ(0, vb.getHighSeqno());
5254-
const auto& manager = *vb.checkpointManager;
5298+
ASSERT_EQ(0, vb.getNumTotalItems());
5299+
ASSERT_EQ(1, vb.getHighSeqno());
5300+
auto& manager = *vb.checkpointManager;
5301+
ASSERT_EQ(1, manager.getNumCheckpoints());
5302+
ASSERT_EQ(1, manager.getNumOpenChkItems());
5303+
ASSERT_EQ(CheckpointType::InitialDisk, manager.getOpenCheckpointType());
5304+
ASSERT_EQ(CheckpointHistorical::Yes, manager.getOpenCheckpointHistorical());
5305+
flush_vbucket_to_disk(vbid, 1);
5306+
}
5307+
5308+
/*
5309+
* HistorySnapshotReceived tests verify (on different scenarios) that:
5310+
* - replica stream is resilient to duplicates on disk snapshot
5311+
* - duplicates are queued in checkpoint (ie, not deduplicated)
5312+
* - duplicates are persisted on disk (again, not deduplicated)
5313+
* - stats (eg item-count) are updated correctly
5314+
*/
5315+
5316+
TEST_P(CDCPassiveStreamTest, HistorySnapshotReceived_Disk) {
5317+
// Replica receives Snap{start, end, Disk|History}, with start->end
5318+
// mutations for the same key.
5319+
5320+
createHistoricalCollection(1, 1);
5321+
5322+
// Clear CM
5323+
const auto& vb = *store->getVBucket(vbid);
5324+
const auto initialHighSeqno = vb.getHighSeqno();
5325+
ASSERT_EQ(1, initialHighSeqno);
5326+
auto& manager = *vb.checkpointManager;
5327+
manager.createNewCheckpoint(true);
52555328
ASSERT_EQ(1, manager.getNumCheckpoints());
52565329
ASSERT_EQ(0, manager.getNumOpenChkItems());
52575330

5258-
const uint32_t opaque = 0;
5331+
const uint32_t opaque = 1;
52595332
SnapshotMarker snapshotMarker(
52605333
opaque,
52615334
vbid,
5262-
1 /*start*/,
5263-
3 /*end*/,
5335+
initialHighSeqno + 1 /*start*/,
5336+
initialHighSeqno + 3 /*end*/,
52645337
MARKER_FLAG_CHK | MARKER_FLAG_DISK | MARKER_FLAG_HISTORY,
52655338
std::optional<uint64_t>(0), /*HCS*/
52665339
{}, /*maxVisibleSeqno*/
52675340
{}, /*timestamp*/
52685341
{} /*streamId*/);
52695342
stream->processMarker(&snapshotMarker);
5343+
ASSERT_EQ(1, manager.getNumCheckpoints());
5344+
ASSERT_EQ(0, manager.getNumOpenChkItems());
5345+
ASSERT_EQ(CheckpointType::Disk, manager.getOpenCheckpointType());
5346+
ASSERT_EQ(CheckpointHistorical::Yes, manager.getOpenCheckpointHistorical());
5347+
5348+
const auto collection = CollectionEntry::historical;
5349+
const std::string key("key");
5350+
const std::string value("value");
5351+
const size_t numItems = 3;
5352+
for (size_t seqno = initialHighSeqno + 1;
5353+
seqno <= initialHighSeqno + numItems;
5354+
++seqno) {
5355+
EXPECT_EQ(
5356+
cb::engine_errc::success,
5357+
stream->messageReceived(makeMutationConsumerMessage(
5358+
opaque, seqno, vbid, value, key, collection.getId())));
5359+
}
5360+
5361+
EXPECT_EQ(initialHighSeqno + numItems, vb.getHighSeqno());
52705362
EXPECT_EQ(1, manager.getNumCheckpoints());
5271-
EXPECT_EQ(0, manager.getNumOpenChkItems());
5363+
EXPECT_EQ(numItems, manager.getNumOpenChkItems());
5364+
5365+
// All duplicates persisted
5366+
flush_vbucket_to_disk(vbid, numItems);
5367+
5368+
// Item count doesn't account for historical revisions
5369+
EXPECT_EQ(1, vb.getNumTotalItems());
5370+
}
5371+
5372+
TEST_P(CDCPassiveStreamTest, HistorySnapshotReceived_InitialDisk) {
5373+
// Replica receives Snap{start, end, InitialDisk|History}, with start->end
5374+
// mutations for the same key.
52725375

5376+
createHistoricalCollection(1, 10);
5377+
5378+
const auto& vb = *store->getVBucket(vbid);
5379+
const auto initialHighSeqno = vb.getHighSeqno();
5380+
ASSERT_EQ(1, initialHighSeqno);
5381+
auto& manager = *vb.checkpointManager;
5382+
ASSERT_EQ(1, manager.getNumCheckpoints());
5383+
ASSERT_EQ(1, manager.getNumOpenChkItems());
5384+
5385+
// Historical items received within the same snapshot
5386+
const auto collection = CollectionEntry::historical;
52735387
const std::string key("key");
52745388
const std::string value("value");
5275-
for (size_t seqno = 1; seqno <= 3; ++seqno) {
5389+
const size_t numItems = 3;
5390+
for (size_t seqno = initialHighSeqno + 1;
5391+
seqno <= initialHighSeqno + numItems;
5392+
++seqno) {
52765393
EXPECT_EQ(cb::engine_errc::success,
5277-
stream->messageReceived(makeMutationConsumerMessage(
5278-
seqno, vbid, value, opaque, key)));
5394+
stream->messageReceived(
5395+
makeMutationConsumerMessage(1 /*opaque*/,
5396+
seqno,
5397+
vbid,
5398+
value,
5399+
key,
5400+
collection.getId())));
52795401
}
52805402

5281-
EXPECT_EQ(3, vb.getHighSeqno());
5403+
// Important: In this scenario historical mutations are queued into the
5404+
// Initial disk checkpoint
5405+
ASSERT_EQ(CheckpointType::InitialDisk, manager.getOpenCheckpointType());
5406+
ASSERT_EQ(CheckpointHistorical::Yes, manager.getOpenCheckpointHistorical());
5407+
5408+
EXPECT_EQ(initialHighSeqno + numItems, vb.getHighSeqno());
52825409
EXPECT_EQ(1, manager.getNumCheckpoints());
5283-
EXPECT_EQ(3, manager.getNumOpenChkItems());
5410+
EXPECT_EQ(initialHighSeqno + numItems, manager.getNumOpenChkItems());
5411+
5412+
// All duplicates persisted
5413+
flush_vbucket_to_disk(vbid, numItems);
5414+
5415+
// Item count doesn't account for historical revisions
5416+
EXPECT_EQ(1, vb.getNumTotalItems());
52845417
}
52855418

52865419
INSTANTIATE_TEST_SUITE_P(Persistent,

engines/ep/tests/module_tests/dcp_stream_test.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,4 +229,15 @@ class CDCActiveStreamTest : public STActiveStreamPersistentTest {
229229
* connections.
230230
*/
231231
class CDCPassiveStreamTest : public STPassiveStreamPersistentTest {
232+
protected:
233+
void SetUp() override;
234+
235+
/**
236+
* Create an historical collection at replica.
237+
* SysEvent received in a snapshot with range defined by the user.
238+
*
239+
* @param snapStart
240+
* @parma snapEnd
241+
*/
242+
void createHistoricalCollection(uint64_t snapStart, uint64_t snapEnd);
232243
};

engines/ep/tests/module_tests/dcp_utils.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,32 @@ std::unique_ptr<MutationConsumerMessage> makeMutationConsumerMessage(
127127
nullptr,
128128
cb::mcbp::DcpStreamId{});
129129
}
130+
131+
std::unique_ptr<MutationConsumerMessage> makeMutationConsumerMessage(
132+
uint64_t opaque,
133+
uint64_t seqno,
134+
Vbid vbid,
135+
const std::string& value,
136+
const std::string& key,
137+
CollectionID cid) {
138+
queued_item qi(new Item(makeStoredDocKey(key, cid),
139+
0,
140+
0,
141+
value.c_str(),
142+
value.size(),
143+
PROTOCOL_BINARY_RAW_BYTES,
144+
0,
145+
seqno,
146+
vbid,
147+
1));
148+
return std::make_unique<MutationConsumerMessage>(
149+
std::move(qi),
150+
opaque,
151+
IncludeValue::Yes,
152+
IncludeXattrs::Yes,
153+
IncludeDeleteTime::No,
154+
IncludeDeletedUserXattrs::Yes,
155+
DocKeyEncodesCollectionId::No,
156+
nullptr,
157+
cb::mcbp::DcpStreamId{});
158+
}

0 commit comments

Comments
 (0)