Skip to content

Commit 9cb371e

Browse files
committed
MB-54850: Propagate the MARKER_FLAG_HISTORY information to Checkpoint
Necessary for then passing that information to magma at persistence as a batch-flag. Change-Id: I093d5541e6db404c0eb3e8172d02f9833f6a0e59 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/184977 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]>
1 parent aed6bcf commit 9cb371e

File tree

6 files changed

+57
-22
lines changed

6 files changed

+57
-22
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ Checkpoint::Checkpoint(CheckpointManager& manager,
5757
std::optional<uint64_t> highCompletedSeqno,
5858
uint64_t highPreparedSeqno,
5959
Vbid vbid,
60-
CheckpointType checkpointType)
60+
CheckpointType checkpointType,
61+
CheckpointHistorical historical)
6162
: manager(&manager),
6263
stats(st),
6364
checkpointId(id),
@@ -76,7 +77,8 @@ Checkpoint::Checkpoint(CheckpointManager& manager,
7677
queuedItemsMemUsage(st, &manager.memUsage),
7778
queueMemOverhead(st, &manager.memUsage),
7879
checkpointType(checkpointType),
79-
highCompletedSeqno(std::move(highCompletedSeqno)) {
80+
highCompletedSeqno(std::move(highCompletedSeqno)),
81+
historical(historical) {
8082
auto& core = stats.coreLocal.get();
8183
core->memOverhead.fetch_add(sizeof(Checkpoint));
8284
core->numCheckpoints++;

engines/ep/src/checkpoint.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ class Checkpoint {
284284
std::optional<uint64_t> highCompletedSeqno,
285285
uint64_t highPreparedSeqno,
286286
Vbid vbid,
287-
CheckpointType checkpointType);
287+
CheckpointType checkpointType,
288+
CheckpointHistorical historical);
288289

289290
~Checkpoint();
290291

@@ -761,5 +762,9 @@ class Checkpoint {
761762
// de-duplication.
762763
std::optional<uint64_t> maxDeletedRevSeqno;
763764

765+
// Whether the snapshot stored in this checkpoint is part of a historical
766+
// sequence of mutation.
767+
const CheckpointHistorical historical;
768+
764769
friend std::ostream& operator <<(std::ostream& os, const Checkpoint& m);
765770
};

engines/ep/src/checkpoint_manager.cc

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ CheckpointManager::CheckpointManager(EPStats& st,
6464
maxVisibleSeqno,
6565
{},
6666
maxPrepareSeqno,
67-
CheckpointType::Memory);
67+
CheckpointType::Memory,
68+
CheckpointHistorical::No);
6869

6970
if (checkpointConfig.isPersistenceEnabled()) {
7071
// Register the persistence cursor
@@ -130,7 +131,8 @@ void CheckpointManager::addNewCheckpoint(
130131
lastBySeqno + 1,
131132
maxVisibleSeqno,
132133
{},
133-
CheckpointType::Memory);
134+
CheckpointType::Memory,
135+
CheckpointHistorical::No);
134136
}
135137

136138
void CheckpointManager::addNewCheckpoint(
@@ -139,7 +141,8 @@ void CheckpointManager::addNewCheckpoint(
139141
uint64_t snapEndSeqno,
140142
uint64_t visibleSnapEnd,
141143
std::optional<uint64_t> highCompletedSeqno,
142-
CheckpointType checkpointType) {
144+
CheckpointType checkpointType,
145+
CheckpointHistorical historical) {
143146
// First, we must close the open checkpoint.
144147
auto* const oldOpenCkptPtr = checkpointList.back().get();
145148
auto& oldOpenCkpt = *oldOpenCkptPtr;
@@ -173,7 +176,8 @@ void CheckpointManager::addNewCheckpoint(
173176
visibleSnapEnd,
174177
highCompletedSeqno,
175178
hps,
176-
checkpointType);
179+
checkpointType,
180+
historical);
177181

178182
// If cursors reached to the end of its current checkpoint, move it to the
179183
// next checkpoint. That is done to help in making checkpoints eligible for
@@ -219,7 +223,8 @@ void CheckpointManager::addOpenCheckpoint(
219223
uint64_t visibleSnapEnd,
220224
std::optional<uint64_t> highCompletedSeqno,
221225
uint64_t highPreparedSeqno,
222-
CheckpointType checkpointType) {
226+
CheckpointType checkpointType,
227+
CheckpointHistorical historical) {
223228
Expects(checkpointList.empty() ||
224229
checkpointList.back()->getState() ==
225230
checkpoint_state::CHECKPOINT_CLOSED);
@@ -249,7 +254,8 @@ void CheckpointManager::addOpenCheckpoint(
249254
highCompletedSeqno,
250255
highPreparedSeqno,
251256
vb.getId(),
252-
checkpointType);
257+
checkpointType,
258+
historical);
253259
// Add an empty-item into the new checkpoint.
254260
// We need this because every CheckpointCursor will point to this empty-item
255261
// at creation. So, the cursor will point at the first actual non-meta item
@@ -1157,7 +1163,8 @@ void CheckpointManager::clear(const std::lock_guard<std::mutex>& lh,
11571163
maxVisibleSeqno,
11581164
{},
11591165
0, // HPS=0 because we have correct val on disk and in PDM
1160-
CheckpointType::Memory);
1166+
CheckpointType::Memory,
1167+
CheckpointHistorical::No);
11611168
resetCursors();
11621169
}
11631170

@@ -1288,7 +1295,8 @@ void CheckpointManager::createSnapshot(
12881295
uint64_t snapEndSeqno,
12891296
std::optional<uint64_t> highCompletedSeqno,
12901297
CheckpointType checkpointType,
1291-
uint64_t visibleSnapEnd) {
1298+
uint64_t visibleSnapEnd,
1299+
CheckpointHistorical historical) {
12921300
if (isDiskCheckpointType(checkpointType)) {
12931301
Expects(highCompletedSeqno.has_value());
12941302
}
@@ -1310,7 +1318,8 @@ void CheckpointManager::createSnapshot(
13101318
snapEndSeqno,
13111319
visibleSnapEnd,
13121320
highCompletedSeqno,
1313-
checkpointType);
1321+
checkpointType,
1322+
historical);
13141323
}
13151324

13161325
void CheckpointManager::extendOpenCheckpoint(uint64_t snapEnd,

engines/ep/src/checkpoint_manager.h

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -482,11 +482,13 @@ class CheckpointManager {
482482
*/
483483
bool isEligibleForCheckpointRemovalAfterPersistence() const;
484484

485-
void createSnapshot(uint64_t snapStartSeqno,
486-
uint64_t snapEndSeqno,
487-
std::optional<uint64_t> highCompletedSeqno,
488-
CheckpointType checkpointType,
489-
uint64_t maxVisibleSnapEnd);
485+
void createSnapshot(
486+
uint64_t snapStartSeqno,
487+
uint64_t snapEndSeqno,
488+
std::optional<uint64_t> highCompletedSeqno,
489+
CheckpointType checkpointType,
490+
uint64_t maxVisibleSnapEnd,
491+
CheckpointHistorical historical = CheckpointHistorical::No);
490492

491493
/**
492494
* Extend the open checkpoint to contain more mutations. Allowed only for
@@ -760,13 +762,15 @@ class CheckpointManager {
760762
* @param highCompletedSeqno optional SyncRep HCS to be flushed to disk
761763
* @param checkpointType is the checkpoint created from a replica receiving
762764
* a disk snapshot?
765+
* @param historical Whether this checkpoint stores an historical snapshot
763766
*/
764767
void addNewCheckpoint(const std::lock_guard<std::mutex>& lh,
765768
uint64_t snapStartSeqno,
766769
uint64_t snapEndSeqno,
767770
uint64_t visibleSnapEnd,
768771
std::optional<uint64_t> highCompletedSeqno,
769-
CheckpointType checkpointType);
772+
CheckpointType checkpointType,
773+
CheckpointHistorical historical);
770774

771775
/**
772776
* Closes the current open checkpoint and adds a new open checkpoint to
@@ -787,13 +791,15 @@ class CheckpointManager {
787791
* @param highCompletedSeqno the SyncRepl HCS to be flushed to disk
788792
* @param checkpointType is the checkpoint created from a replica receiving
789793
* a disk snapshot?
794+
* @param historical Whether this checkpoint stores an historical snapshot
790795
*/
791796
void addOpenCheckpoint(uint64_t snapStart,
792797
uint64_t snapEnd,
793798
uint64_t visibleSnapEnd,
794799
std::optional<uint64_t> highCompletedSeqno,
795800
uint64_t highPreparedSeqno,
796-
CheckpointType checkpointType);
801+
CheckpointType checkpointType,
802+
CheckpointHistorical historical);
797803

798804
/**
799805
* Moves the cursor to the empty item into the next checkpoint (if any).

engines/ep/src/dcp/passive_stream.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,10 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
11311131
? CheckpointType::Disk
11321132
: CheckpointType::Memory;
11331133

1134+
const auto historical = marker->getFlags() & MARKER_FLAG_HISTORY
1135+
? CheckpointHistorical::Yes
1136+
: CheckpointHistorical::No;
1137+
11341138
// Check whether the snapshot can be considered as an initial disk
11351139
// checkpoint for the replica.
11361140
if (checkpointType == CheckpointType::Disk && vb->getHighSeqno() == 0) {
@@ -1180,7 +1184,8 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
11801184
cur_snapshot_end.load(),
11811185
hcs,
11821186
checkpointType,
1183-
visibleSeq);
1187+
visibleSeq,
1188+
historical);
11841189
} else {
11851190
// Case: receiving any type of snapshot (Disk/Memory).
11861191

@@ -1189,7 +1194,8 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
11891194
cur_snapshot_end.load(),
11901195
hcs,
11911196
checkpointType,
1192-
visibleSeq);
1197+
visibleSeq,
1198+
historical);
11931199
} else {
11941200
// MB-42780: In general we cannot merge multiple snapshots into
11951201
// the same checkpoint. The only exception is for when replica
@@ -1207,7 +1213,8 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
12071213
cur_snapshot_end.load(),
12081214
hcs,
12091215
checkpointType,
1210-
visibleSeq);
1216+
visibleSeq,
1217+
historical);
12111218
}
12121219
}
12131220
}

engines/ep/src/ep_types.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,12 @@ enum class CheckpointType : uint8_t {
110110
// Returns true if given type is either a Disk checkpoint or its subtype.
111111
bool isDiskCheckpointType(CheckpointType type);
112112

113+
/**
114+
* Encodes whether the snapshot stored in a checkpoint is part of a historical
115+
* sequence of mutation.
116+
*/
117+
enum class CheckpointHistorical : uint8_t { No, Yes };
118+
113119
enum class ConflictResolutionMode {
114120
/// Resolve conflicts based on document revision id (revid).
115121
RevisionId,

0 commit comments

Comments
 (0)