Skip to content

Commit 5629c4e

Browse files
committed
MB-54553: Propagate CheckpointHistorical to magma::saveDocs()
Passed when committing the flush-batch. Primary usage is to inform magma in the case where the historical data stream has been interrupted. * Note * Magma needs that the flag is always passed to the storage. 7.2 active nodes always create checkpoints with Historical::Yes. While 7.2 replicas set the Historical flag in checkpoints depending on what is received in snapshot Markers. Change-Id: Ia1a3ff07130a77e2cc919334adee1876c0092c9e Reviewed-on: https://review.couchbase.org/c/kv_engine/+/185333 Well-Formed: Restriction Checker Reviewed-by: Jim Walker <[email protected]> Tested-by: Paolo Cocchi <[email protected]>
1 parent 108151d commit 5629c4e

File tree

11 files changed

+61
-21
lines changed

11 files changed

+61
-21
lines changed

engines/ep/src/checkpoint_manager.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ CheckpointManager::CheckpointManager(EPStats& st,
6565
{},
6666
maxPrepareSeqno,
6767
CheckpointType::Memory,
68-
CheckpointHistorical::No);
68+
CheckpointHistorical::Yes);
6969

7070
if (checkpointConfig.isPersistenceEnabled()) {
7171
// Register the persistence cursor
@@ -137,7 +137,7 @@ void CheckpointManager::addNewCheckpoint(
137137
maxVisibleSeqno,
138138
{},
139139
CheckpointType::Memory,
140-
CheckpointHistorical::No);
140+
CheckpointHistorical::Yes);
141141
}
142142

143143
void CheckpointManager::addNewCheckpoint(
@@ -1170,7 +1170,7 @@ void CheckpointManager::clear(const std::lock_guard<std::mutex>& lh,
11701170
{},
11711171
0, // HPS=0 because we have correct val on disk and in PDM
11721172
CheckpointType::Memory,
1173-
CheckpointHistorical::No);
1173+
CheckpointHistorical::Yes);
11741174
resetCursors();
11751175
}
11761176

engines/ep/src/ep_bucket.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,8 @@ EPBucket::FlushResult EPBucket::flushVBucket_UNLOCKED(LockedVBucketPtr vb) {
488488
writeOp = WriteOperation::Insert;
489489
}
490490

491-
VB::Commit commitData(vb->getManifest(), writeOp, vbstate, callback);
491+
VB::Commit commitData(
492+
vb->getManifest(), writeOp, vbstate, callback, toFlush.historical);
492493

493494
vbucket_state& proposedVBState = commitData.proposedVBState;
494495

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,8 @@ bool MagmaKVStore::commit(std::unique_ptr<TransactionContext> txnCtx,
750750
preFlushHook();
751751

752752
// Flush all documents to disk
753-
auto errCode = saveDocs(ctx, commitData, kvctx);
753+
auto errCode = saveDocs(
754+
ctx, commitData, kvctx, toHistoryMode(commitData.historical));
754755
if (errCode != static_cast<int>(cb::engine_errc::success)) {
755756
logger->warn("MagmaKVStore::commit: saveDocs {} errCode:{}",
756757
txnCtx->vbid,
@@ -1290,7 +1291,8 @@ GetValue MagmaKVStore::makeGetValue(Vbid vb,
12901291

12911292
int MagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
12921293
VB::Commit& commitData,
1293-
kvstats_ctx& kvctx) {
1294+
kvstats_ctx& kvctx,
1295+
magma::Magma::HistoryMode historyMode) {
12941296
uint64_t ninserts = 0;
12951297
uint64_t ndeletes = 0;
12961298

@@ -1541,7 +1543,8 @@ int MagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
15411543
writeOps,
15421544
kvstoreRevList[getCacheSlot(vbid)],
15431545
writeDocsCB,
1544-
postWriteDocsCB);
1546+
postWriteDocsCB,
1547+
historyMode);
15451548

15461549
saveDocsPostWriteDocsHook();
15471550

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,9 @@ class MagmaKVStore : public KVStore {
593593

594594
virtual int saveDocs(MagmaKVStoreTransactionContext& txnCtx,
595595
VB::Commit& commitData,
596-
kvstats_ctx& kvctx);
596+
kvstats_ctx& kvctx,
597+
magma::Magma::HistoryMode historyMode =
598+
magma::Magma::HistoryMode::Disabled);
597599

598600
void commitCallback(MagmaKVStoreTransactionContext& txnCtx,
599601
int status,
@@ -768,6 +770,17 @@ class MagmaKVStore : public KVStore {
768770
const magma::Slice& valSlice,
769771
std::function<magma::Status(magma::Slice&)> valueRead) const;
770772

773+
static magma::Magma::HistoryMode toHistoryMode(
774+
CheckpointHistorical historical) {
775+
switch (historical) {
776+
case CheckpointHistorical::No:
777+
return magma::Magma::HistoryMode::Disabled;
778+
case CheckpointHistorical::Yes:
779+
return magma::Magma::HistoryMode::Enabled;
780+
}
781+
folly::assume_unreachable();
782+
}
783+
771784
MagmaKVStoreConfig& configuration;
772785

773786
/**

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,8 @@ magma::Status MagmaMemoryTrackingProxy::WriteDocs(
463463
const std::vector<magma::Magma::WriteOperation>& docOperations,
464464
const magma::Magma::KVStoreRevision kvsRev,
465465
const magma::Magma::WriteDocsCallback docCallback,
466-
const magma::Magma::PostWriteDocsCallback postCallback) {
466+
const magma::Magma::PostWriteDocsCallback postCallback,
467+
const magma::Magma::HistoryMode historyMode) {
467468
magma::Magma::WriteDocsCallback wrappedDocCallback = nullptr;
468469
magma::Magma::PostWriteDocsCallback wrappedPostCallback = nullptr;
469470

@@ -491,7 +492,8 @@ magma::Status MagmaMemoryTrackingProxy::WriteDocs(
491492
docOperations,
492493
kvsRev,
493494
wrappedDocCallback,
494-
wrappedPostCallback);
495+
wrappedPostCallback,
496+
historyMode);
495497
}
496498

497499
magma::Status MagmaMemoryTrackingProxy::NewCheckpoint(

engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,16 @@ class MagmaMemoryTrackingProxy {
207207
void SetNumThreads(magma::Magma::ThreadType threadType, size_t nThreads);
208208
magma::Status Sync(bool flushAll);
209209
magma::Status SyncKVStore(const magma::Magma::KVStoreID kvID);
210+
210211
magma::Status WriteDocs(
211212
const magma::Magma::KVStoreID kvID,
212213
const std::vector<magma::Magma::WriteOperation>& docOperations,
213214
const magma::Magma::KVStoreRevision kvsRev = 1,
214215
const magma::Magma::WriteDocsCallback docCallback = nullptr,
215-
const magma::Magma::PostWriteDocsCallback postCallback = nullptr);
216+
const magma::Magma::PostWriteDocsCallback postCallback = nullptr,
217+
const magma::Magma::HistoryMode historyMode =
218+
magma::Magma::HistoryMode::Disabled);
219+
216220
magma::Status NewCheckpoint(const magma::Magma::KVStoreID kvID);
217221
magma::Status StopBGCompaction(const magma::Magma::KVStoreID kvID);
218222
magma::Status ResumeBGCompaction(const magma::Magma::KVStoreID kvID);

engines/ep/src/vb_commit.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@ namespace VB {
1717
Commit::Commit(Collections::VB::Manifest& manifest,
1818
WriteOperation writeOp,
1919
vbucket_state vbs,
20-
SysErrorCallback sysErrorCallback)
20+
SysErrorCallback sysErrorCallback,
21+
CheckpointHistorical historical)
2122
: collections(manifest),
2223
writeOp(writeOp),
2324
proposedVBState(std::move(vbs)),
24-
sysErrorCallback(std::move(sysErrorCallback)) {
25+
sysErrorCallback(std::move(sysErrorCallback)),
26+
historical(historical) {
2527
}
2628
} // namespace VB

engines/ep/src/vb_commit.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class Commit {
3939
explicit Commit(Collections::VB::Manifest& manifest,
4040
WriteOperation writeOp = WriteOperation::Upsert,
4141
vbucket_state vbs = {},
42-
SysErrorCallback sysErrorCallback = {});
42+
SysErrorCallback sysErrorCallback = {},
43+
CheckpointHistorical historical = CheckpointHistorical::No);
4344

4445
/// Object for updating the collection's meta-data during commit
4546
Collections::VB::Flush collections;
@@ -60,6 +61,12 @@ class Commit {
6061
* Allows EP to take decisions on how to react to the failure.
6162
*/
6263
SysErrorCallback sysErrorCallback;
64+
65+
/**
66+
* Tells the storage whether the flush-batch is part of a seamless sequence
67+
* of historical data.
68+
*/
69+
CheckpointHistorical historical;
6370
};
6471

6572
} // end namespace VB

engines/ep/tests/mock/mock_magma_kvstore.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ KVStoreIface::ReadVBStateResult MockMagmaKVStore::readVBStateFromDisk(
3535

3636
int MockMagmaKVStore::saveDocs(MagmaKVStoreTransactionContext& txnCtx,
3737
VB::Commit& commitData,
38-
kvstats_ctx& kvctx) {
38+
kvstats_ctx& kvctx,
39+
magma::Magma::HistoryMode historyMode) {
3940
if (saveDocsErrorInjector) {
4041
return saveDocsErrorInjector(commitData, kvctx);
4142
}
4243

43-
return MagmaKVStore::saveDocs(txnCtx, commitData, kvctx);
44+
return MagmaKVStore::saveDocs(txnCtx, commitData, kvctx, historyMode);
4445
}
4546

4647
bool MockMagmaKVStore::snapshotVBucket(Vbid vbid,

engines/ep/tests/mock/mock_magma_kvstore.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ class MockMagmaKVStore : public MagmaKVStore {
5454

5555
int saveDocs(MagmaKVStoreTransactionContext& txnCtx,
5656
VB::Commit& commitData,
57-
kvstats_ctx& kvctx) override;
57+
kvstats_ctx& kvctx,
58+
magma::Magma::HistoryMode historyMode =
59+
magma::Magma::HistoryMode::Disabled) override;
5860

5961
bool snapshotVBucket(Vbid vbid, const vbucket_state& newVBState) override;
6062

0 commit comments

Comments
 (0)