Skip to content

Commit a3707d8

Browse files
committed
MB-54729: Add historyStartSeqno to ScanContext
Add a new member, historyStartSeqno to ScanContext for later use in DCP backfill when "change_stream=true". This will always be 0 for a KVStore which reports: StorageProperties::HistoryRetentionAvailable::No For a KVStore which reports: StorageProperties::HistoryRetentionAvailable::Yes This will be set to: 0: If no history is configured (retention_bytes=0) >0: When history is available, this value represents the lowest seqno at which a continuous change stream could be made available. This commit is not dependent upon Magma's changes to add history, but adds the ability for MockMagmaKVStore to "inject" a history start. Change-Id: I66db1d84af54f4a999f260d30b356a9a149ddf31
1 parent 86fb4e3 commit a3707d8

File tree

4 files changed

+206
-5
lines changed

4 files changed

+206
-5
lines changed

engines/ep/src/kvstore/kvstore.cc

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,31 @@ ScanContext::ScanContext(
6868
Expects(lookup != nullptr);
6969
}
7070

71+
ScanContext::ScanContext(
72+
Vbid vbid,
73+
std::unique_ptr<KVFileHandle> handle,
74+
DocumentFilter docFilter,
75+
ValueFilter valFilter,
76+
std::unique_ptr<StatusCallback<GetValue>> cb,
77+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
78+
const std::vector<Collections::KVStore::DroppedCollection>&
79+
droppedCollections,
80+
uint64_t maxSeqno,
81+
uint64_t historyStartSeqno)
82+
: vbid(vbid),
83+
handle(std::move(handle)),
84+
docFilter(docFilter),
85+
valFilter(valFilter),
86+
logger(getGlobalBucketLogger().get()),
87+
collectionsContext(droppedCollections),
88+
maxSeqno(maxSeqno),
89+
historyStartSeqno(historyStartSeqno),
90+
callback(std::move(cb)),
91+
lookup(std::move(cl)) {
92+
Expects(callback != nullptr);
93+
Expects(lookup != nullptr);
94+
}
95+
7196
BySeqnoScanContext::BySeqnoScanContext(
7297
std::unique_ptr<StatusCallback<GetValue>> cb,
7398
std::unique_ptr<StatusCallback<CacheLookup>> cl,
@@ -100,6 +125,40 @@ BySeqnoScanContext::BySeqnoScanContext(
100125
timestamp(std::move(timestamp)) {
101126
}
102127

128+
BySeqnoScanContext::BySeqnoScanContext(
129+
std::unique_ptr<StatusCallback<GetValue>> cb,
130+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
131+
Vbid vb,
132+
std::unique_ptr<KVFileHandle> handle,
133+
uint64_t start,
134+
uint64_t end,
135+
uint64_t purgeSeqno,
136+
uint64_t historyStartSeqno,
137+
DocumentFilter _docFilter,
138+
ValueFilter _valFilter,
139+
uint64_t _documentCount,
140+
const vbucket_state& vbucketState,
141+
const std::vector<Collections::KVStore::DroppedCollection>&
142+
droppedCollections,
143+
std::optional<uint64_t> timestamp)
144+
145+
: ScanContext(vb,
146+
std::move(handle),
147+
_docFilter,
148+
_valFilter,
149+
std::move(cb),
150+
std::move(cl),
151+
droppedCollections,
152+
end,
153+
historyStartSeqno),
154+
startSeqno(start),
155+
purgeSeqno(purgeSeqno),
156+
documentCount(_documentCount),
157+
maxVisibleSeqno(vbucketState.maxVisibleSeqno),
158+
persistedCompletedSeqno(vbucketState.persistedCompletedSeqno),
159+
timestamp(std::move(timestamp)) {
160+
}
161+
103162
ByIdScanContext::ByIdScanContext(
104163
std::unique_ptr<StatusCallback<GetValue>> cb,
105164
std::unique_ptr<StatusCallback<CacheLookup>> cl,
@@ -123,6 +182,31 @@ ByIdScanContext::ByIdScanContext(
123182
lastReadKey(nullptr, 0) {
124183
}
125184

185+
ByIdScanContext::ByIdScanContext(
186+
std::unique_ptr<StatusCallback<GetValue>> cb,
187+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
188+
Vbid vb,
189+
std::unique_ptr<KVFileHandle> handle,
190+
std::vector<ByIdRange> ranges,
191+
DocumentFilter _docFilter,
192+
ValueFilter _valFilter,
193+
const std::vector<Collections::KVStore::DroppedCollection>&
194+
droppedCollections,
195+
uint64_t maxSeqno,
196+
uint64_t historyStartSeqno)
197+
: ScanContext(vb,
198+
std::move(handle),
199+
_docFilter,
200+
_valFilter,
201+
std::move(cb),
202+
std::move(cl),
203+
droppedCollections,
204+
maxSeqno,
205+
historyStartSeqno),
206+
ranges(std::move(ranges)),
207+
lastReadKey(nullptr, 0) {
208+
}
209+
126210
bool ByIdRange::operator==(const ByIdRange& other) const {
127211
return startKey == other.startKey && endKey == other.endKey &&
128212
rangeScanSuccess == other.rangeScanSuccess;

engines/ep/src/kvstore/kvstore.h

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,17 @@ class ScanContext {
394394
droppedCollections,
395395
uint64_t maxSeqno);
396396

397+
ScanContext(Vbid vbid,
398+
std::unique_ptr<KVFileHandle> handle,
399+
DocumentFilter docFilter,
400+
ValueFilter valFilter,
401+
std::unique_ptr<StatusCallback<GetValue>> cb,
402+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
403+
const std::vector<Collections::KVStore::DroppedCollection>&
404+
droppedCollections,
405+
uint64_t maxSeqno,
406+
uint64_t historyStartSeqno);
407+
397408
virtual ~ScanContext() = default;
398409

399410
virtual const StatusCallback<GetValue>& getValueCallback() const {
@@ -412,14 +423,15 @@ class ScanContext {
412423
return *lookup;
413424
}
414425

415-
const Vbid vbid;
426+
const Vbid vbid{0};
416427
uint64_t lastReadSeqno{0};
417428
std::unique_ptr<KVFileHandle> handle;
418-
const DocumentFilter docFilter;
419-
const ValueFilter valFilter;
420-
BucketLogger* logger;
429+
const DocumentFilter docFilter{DocumentFilter::ALL_ITEMS};
430+
const ValueFilter valFilter{ValueFilter::KEYS_ONLY};
431+
BucketLogger* logger{nullptr};
421432
const Collections::VB::ScanContext collectionsContext;
422-
uint64_t maxSeqno;
433+
uint64_t maxSeqno{0};
434+
uint64_t historyStartSeqno{0};
423435

424436
/**
425437
* Cumulative count of bytes read from disk during this scan. Counts
@@ -456,6 +468,23 @@ class BySeqnoScanContext : public ScanContext {
456468
droppedCollections,
457469
std::optional<uint64_t> timestamp = {});
458470

471+
BySeqnoScanContext(
472+
std::unique_ptr<StatusCallback<GetValue>> cb,
473+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
474+
Vbid vb,
475+
std::unique_ptr<KVFileHandle> handle,
476+
uint64_t start,
477+
uint64_t end,
478+
uint64_t purgeSeqno,
479+
uint64_t historyStartSeqno,
480+
DocumentFilter _docFilter,
481+
ValueFilter _valFilter,
482+
uint64_t _documentCount,
483+
const vbucket_state& vbucketState,
484+
const std::vector<Collections::KVStore::DroppedCollection>&
485+
droppedCollections,
486+
std::optional<uint64_t> timestamp = {});
487+
459488
const uint64_t startSeqno;
460489
const uint64_t purgeSeqno;
461490
const uint64_t documentCount;
@@ -517,6 +546,18 @@ class ByIdScanContext : public ScanContext {
517546
const std::vector<Collections::KVStore::DroppedCollection>&
518547
droppedCollections,
519548
uint64_t maxSeqno);
549+
550+
ByIdScanContext(std::unique_ptr<StatusCallback<GetValue>> cb,
551+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
552+
Vbid vb,
553+
std::unique_ptr<KVFileHandle> handle,
554+
std::vector<ByIdRange> ranges,
555+
DocumentFilter _docFilter,
556+
ValueFilter _valFilter,
557+
const std::vector<Collections::KVStore::DroppedCollection>&
558+
droppedCollections,
559+
uint64_t maxSeqno,
560+
uint64_t historyStartSeqno);
520561
std::vector<ByIdRange> ranges;
521562
// Key should be set by KVStore when a scan must be paused, this is where
522563
// a scan can resume from

engines/ep/tests/mock/mock_magma_kvstore.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,44 @@ magma::Status MockMagmaKVStore::runImplicitCompactKVStore(Vbid vbid) {
9090
magma::Status MockMagmaKVStore::newCheckpoint(Vbid vbid) {
9191
return magma->NewCheckpoint(vbid.get());
9292
}
93+
94+
std::unique_ptr<BySeqnoScanContext> MockMagmaKVStore::initBySeqnoScanContext(
95+
std::unique_ptr<StatusCallback<GetValue>> cb,
96+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
97+
Vbid vbid,
98+
uint64_t startSeqno,
99+
DocumentFilter options,
100+
ValueFilter valOptions,
101+
SnapshotSource source,
102+
std::unique_ptr<KVFileHandle> fileHandle) const {
103+
auto scanContext =
104+
MagmaKVStore::initBySeqnoScanContext(std::move(cb),
105+
std::move(cl),
106+
vbid,
107+
startSeqno,
108+
options,
109+
valOptions,
110+
source,
111+
std::move(fileHandle));
112+
113+
if (historyStartSeqno) {
114+
scanContext->historyStartSeqno = historyStartSeqno.value();
115+
}
116+
return scanContext;
117+
}
118+
119+
std::unique_ptr<ByIdScanContext> MockMagmaKVStore::initByIdScanContext(
120+
std::unique_ptr<StatusCallback<GetValue>> cb,
121+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
122+
Vbid vbid,
123+
const std::vector<ByIdRange>& ranges,
124+
DocumentFilter options,
125+
ValueFilter valOptions) const {
126+
auto scanContext = MagmaKVStore::initByIdScanContext(
127+
std::move(cb), std::move(cl), vbid, ranges, options, valOptions);
128+
129+
if (historyStartSeqno) {
130+
scanContext->historyStartSeqno = historyStartSeqno.value();
131+
}
132+
return scanContext;
133+
}

engines/ep/tests/mock/mock_magma_kvstore.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,34 @@ class MockMagmaKVStore : public MagmaKVStore {
1919
public:
2020
explicit MockMagmaKVStore(MagmaKVStoreConfig& config);
2121

22+
/**
23+
* See base-class comments for usage. However this "mock" version will
24+
* override the ScanContext::historyStartSeqno if this class defines
25+
* historyStartSeqno (see historyStartSeqno member below).
26+
*/
27+
std::unique_ptr<BySeqnoScanContext> initBySeqnoScanContext(
28+
std::unique_ptr<StatusCallback<GetValue>> cb,
29+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
30+
Vbid vbid,
31+
uint64_t startSeqno,
32+
DocumentFilter options,
33+
ValueFilter valOptions,
34+
SnapshotSource source,
35+
std::unique_ptr<KVFileHandle> fileHandle = nullptr) const override;
36+
37+
/**
38+
* See base-class comments for usage. However this "mock" version will
39+
* override the ScanContext::historyStartSeqno if this class defines
40+
* historyStartSeqno (see historyStartSeqno member below).
41+
*/
42+
std::unique_ptr<ByIdScanContext> initByIdScanContext(
43+
std::unique_ptr<StatusCallback<GetValue>> cb,
44+
std::unique_ptr<StatusCallback<CacheLookup>> cl,
45+
Vbid vbid,
46+
const std::vector<ByIdRange>& ranges,
47+
DocumentFilter options,
48+
ValueFilter valOptions) const override;
49+
2250
ReadVBStateResult readVBStateFromDisk(Vbid vbid);
2351

2452
ReadVBStateResult readVBStateFromDisk(
@@ -75,6 +103,13 @@ class MockMagmaKVStore : public MagmaKVStore {
75103
std::function<bool()> snapshotVBucketErrorInjector;
76104

77105
StorageProperties storageProperties;
106+
107+
/**
108+
* The historyStartSeqno in this class when set will override the value
109+
* MagmaKVStore sets. This allows arbitrary placement of the history range
110+
* for testing of various two-phase backfills.
111+
*/
112+
std::optional<uint64_t> historyStartSeqno;
78113
};
79114

80115
#endif

0 commit comments

Comments
 (0)