Skip to content

Commit f95f2e7

Browse files
committed
MB-54729: Enable history scan for CDC backfill
Replace the todo markers with code that now utilises the magma history API - this now means scanAllVersions for example is hooked into the magma history scanning API. Add new tests that validate multiple versions can be stored and returned. Also required are changes to unit tests to respect new expectation checks that occur in magma - primarily that flushing writes ordered batches - this is only a problem for tests which bypass the flusher and call KVStore directly. **** ISSUES **** ep-engine_ep_unit_tests does not pass: 1) Exception from magma MagmaKVStoreRollbackTest.Rollback hits the following exception GSL: Precondition failure: 'levelSize >= compactionState[level].history.Size' at /Users/jimwalker/Code/couchbase/neo/magma/lsm/lsm_tree.cc:895 2) Seg-fault in magma Seen in a number of tests, 1 example: CollectionsDcpEphemeralOrPersistent/CollectionsDcpParameterizedTest.DefaultCollectionDropped/persistent_magma_value_only Process 78731 stopped * thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] 72 } 73 74 Slice DocSequenceBuffer::GetKey() { -> 75 seqFmt.Set(sortedList[offset]->seqno); 76 return seqFmt.Encode(); 77 } 78 * thread couchbase#1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) * frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] frame couchbase#1: 0x0000000101361e2e ep-engine_ep_unit_tests`magma::mvccIteratorAdaptor::GetKey(this=0x0000000118536c00) at mvcc.h:249:25 [opt] frame couchbase#2: 0x000000010132b688 ep-engine_ep_unit_tests`magma::IteratorWithFilter::filterKeys(this=0x0000000118128350) at iterator.cc:214:32 [opt] frame couchbase#3: 0x000000010132de5b ep-engine_ep_unit_tests`magma::KVReader::ReadKVs(this=0x00007ff7bfefd550) at common.cc:70:19 [opt] frame couchbase#4: 0x0000000101378f63 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, w=0x00007ff7bfefd890, itr=0x0000000118128350, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefd860)>) at lsm_tree.cc:719:15 [opt] frame couchbase#5: 0x0000000101376ee8 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, appendMode=<unavailable>, itr=0x0000000118128350, sizeEstimate=<unavailable>, maxSn=10, stopFn=function<bool (const magma::Slice &)> @ 0x00007ff7bfefdb60)>) at lsm_tree.cc:682:17 [opt] frame couchbase#6: 0x00000001013761b2 ep-engine_ep_unit_tests`magma::LSMTree::writeMemtable(this=0x000000011855a820, memtable=0x000000011854c7a0) at lsm_tree.cc:449:21 [opt] frame #7: 0x000000010137753f ep-engine_ep_unit_tests`magma::LSMTree::doMemtableFlushWork(this=0x000000011855a820) at lsm_tree.cc:531:18 [opt] frame #8: 0x000000010139fe62 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] magma::LSMTree::newFlush(this=<unavailable>)::$_16::operator()() const at lsm_tree.cc:993:34 [opt] frame #9: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] decltype(__f=<unavailable>)::$_16&>(fp)()) std::__1::__invoke<magma::LSMTree::newFlush()::$_16&>(magma::LSMTree::newFlush()::$_16&) at type_traits:3918:1 [opt] frame #10: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::tuple<magma::Status, magma::CheckpointTransaction> std::__1::__invoke_void_return_wrapper<std::__1::tuple<magma::Status, magma::CheckpointTransaction>, false>::__call<magma::LSMTree::newFlush(__args=<unavailable>)::$_16&>(magma::LSMTree::newFlush()::$_16&) at invoke.h:30:16 [opt] frame #11: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:178:16 [opt] frame #12: 0x000000010139fe59 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::LSMTree::newFlush()::$_16, std::__1::allocator<magma::LSMTree::newFlush()::$_16>, std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() at function.h:352:12 [opt] frame #13: 0x00000001012f72af ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::__function::__value_func<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt] frame #14: 0x00000001012f7296 ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::function<std::__1::tuple<magma::Status, magma::CheckpointTransaction> ()>::operator(this=0x0000000118131560)() const at function.h:1182:12 [opt] frame #15: 0x00000001012f7292 ep-engine_ep_unit_tests`magma::FlushWork::Execute(this=0x0000000118131560) at flush_work.cc:61:29 [opt] frame #16: 0x0000000101389d5e ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x00007ff7bfefe1c0)::$_38::operator()() at kvstore.cc:515:27 [opt] frame #17: 0x0000000101388fac ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x000000010442a420, wal=<unavailable>, offset=(SegID = 1, SegOffset = 4096), flushMode=<unavailable>, blockMode=Blocking) at kvstore.cc:582:16 [opt] frame #18: 0x0000000101389a5a ep-engine_ep_unit_tests`magma::KVStore::FlushMemTables(this=<unavailable>, wal=<unavailable>, flushMode=<unavailable>, blockMode=<unavailable>) at kvstore.cc:387:12 [opt] frame #19: 0x00000001012fd9ba ep-engine_ep_unit_tests`magma::Magma::Impl::syncKVStore(this=0x000000011814f000, kvID=<unavailable>, checkpoint=true) at db.cc:1352:21 [opt] frame #20: 0x000000010132678a ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=0x00007ff7bfefe400)>)::$_7::operator()() const at db.cc:880:23 [opt] frame #21: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=<unavailable>)>)::$_8::operator()() const at db.cc:891:21 [opt] frame #22: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] decltype(__f=<unavailable>)>)::$_8&>(fp)()) std::__1::__invoke<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at type_traits:3918:1 [opt] frame #23: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] void std::__1::__invoke_void_return_wrapper<void, true>::__call<magma::Magma::Impl::CompactKVStore(__args=<unavailable>)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8&) at invoke.h:61:9 [opt] frame #24: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] std::__1::__function::__alloc_func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:178:16 [opt] frame #25: 0x0000000101326764 ep-engine_ep_unit_tests`std::__1::__function::__func<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8, std::__1::allocator<magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>)::$_8>, void ()>::operator(this=<unavailable>)() at function.h:352:12 [opt] frame #26: 0x0000000101303138 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::__function::__value_func<void ()>::operator(this=<unavailable>)() const at function.h:505:16 [opt] frame #27: 0x000000010130312d ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] std::__1::function<void ()>::operator(this=0x00007ff7bfefe4b0)() const at function.h:1182:12 [opt] frame #28: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:92:9 [opt] frame #29: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function<std::__1::unique_ptr<magma::Magma::CompactionCallback, std::__1::default_delete<magma::Magma::CompactionCallback> > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:91:14 [opt] frame #30: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(this=<unavailable>, kvID=<unavailable>, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefe550)>) at db.cc:895:1 [opt] frame #31: 0x000000010130336c ep-engine_ep_unit_tests`magma::Magma::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=<unavailable>, makeCallback=<unavailable>)>) at db.cc:901:18 [opt] frame #32: 0x000000010004fd3d ep-engine_ep_unit_tests`MagmaMemoryTrackingProxy::CompactKVStore(this=<unavailable>, kvID=0, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefea00)>) at magma-memory-tracking-proxy.cc:190:19 [opt] frame #33: 0x00000001000a9eeb ep-engine_ep_unit_tests`MagmaKVStore::compactDBInternal(this=<unavailable>, vbLock=0x00007ff7bfefeda0, ctx=std::__1::shared_ptr<CompactionContext>::element_type @ 0x00000001184acc20 strong=3 weak=1) at magma-kvstore.cc:2590:29 [opt] frame #34: 0x00000001000a93ad ep-engine_ep_unit_tests`MagmaKVStore::compactDB(this=0x00000001067e6500, vbLock=0x00007ff7bfefeda0, ctx=nullptr) at magma-kvstore.cc:2445:12 [opt] frame #35: 0x00000001001d7eb0 ep-engine_ep_unit_tests`EPBucket::compactInternal(this=0x00000001067e6000, vb=0x00007ff7bfefed90, config=<unavailable>) at ep_bucket.cc:1398:25 [opt] frame #36: 0x00000001001d83f6 ep-engine_ep_unit_tests`EPBucket::doCompact(this=0x00000001067e6000, vbid=(vbid = 0), config=0x00007ff7bfefedf0, cookies=size=0) at ep_bucket.cc:1476:14 [opt] 3) Key sorting issue Magma now checks for sorted keys - it turns out KV flushing is violating that ordering. Need to know if KV should fix or is the magma check required?? Example: CollectionsDcpEphemeralOrPersistent/CollectionsLegacyDcpTest.default_collection_is_not_vbucket_highseqno_with_pending/persistent_nexus_couchstore_magma_value_only CRITICAL [(SynchronousEPEngine:default) magma_0]Fatal error: Found: preceding key(d2) > current key( _collection). If history is enabled, all keys in the batch must be sorted lexicographicall The problem is that the test flushes a prepare(default collection, key=d2) and create-collection(fruit) together. The flusher orders these... \0d2 \1create_fruit This is correct. But \0d2 is marked as a prepare, when flushed to disk it goes into a special namespace. This occurs in KVStore after the sorting. \0d2 becomes \2\0d2 And magma actually sees \2\0d2 \1create_fruit and we have violated the expects Change-Id: Ica9ea1b52c51f125c9e8839a0fca412834fc25f7
1 parent 087b981 commit f95f2e7

File tree

12 files changed

+316
-87
lines changed

12 files changed

+316
-87
lines changed

engines/ep/src/kv_bucket.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3098,6 +3098,13 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const {
30983098

30993099
void KVBucket::setHistoryRetentionBytes(size_t bytes) {
31003100
historyRetentionBytes = bytes;
3101+
for (auto& i : vbMap.shards) {
3102+
KVShard* shard = i.get();
3103+
// The KVStore needs to know the per vbucket size
3104+
shard->getRWUnderlying()->setHistoryRetentionBytes(bytes /
3105+
vbMap.getSize());
3106+
}
3107+
31013108
}
31023109

31033110
size_t KVBucket::getHistoryRetentionBytes() const {

engines/ep/src/kvstore/kvstore.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,6 +862,13 @@ class KVStore : public KVStoreIface {
862862
*/
863863
void checkIfInTransaction(Vbid vbid, std::string_view caller);
864864

865+
void setHistoryRetentionBytes(size_t size) override {
866+
// no-op.
867+
// Only supported by backends which report
868+
// StorageProperties::HistoryRetentionAvailable::Yes
869+
// For all other backends this function is allowed, but does nothing.
870+
}
871+
865872
/**
866873
* Check if the specified document metadata is /potentially/ affected
867874
* by a datatype corruption issue (MB-52793) - a deleted document with

engines/ep/src/kvstore/kvstore_iface.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,11 @@ class KVStoreIface {
763763
* @param vbid ID of the vbucket being created
764764
*/
765765
virtual void prepareToCreateImpl(Vbid vbid) = 0;
766+
767+
/**
768+
* Method to configure the amount of history a vbucket should retain.
769+
*/
770+
virtual void setHistoryRetentionBytes(size_t size) = 0;
766771
};
767772

768773
std::string to_string(KVStoreIface::ReadVBStateStatus status);

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const {
869869
StorageProperties::AutomaticDeduplication::No,
870870
StorageProperties::PrepareCounting::No,
871871
StorageProperties::CompactionStaleItemCallbacks::Yes,
872-
StorageProperties::HistoryRetentionAvailable::No);
872+
StorageProperties::HistoryRetentionAvailable::Yes);
873873
return rv;
874874
}
875875

@@ -1695,8 +1695,7 @@ std::unique_ptr<BySeqnoScanContext> MagmaKVStore::initBySeqnoScanContext(
16951695
getDroppedStatus.String());
16961696
}
16971697

1698-
// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
1699-
auto historyStartSeqno = 0;
1698+
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
17001699
if (logger->should_log(spdlog::level::info)) {
17011700
logger->info(
17021701
"MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}"
@@ -1809,8 +1808,7 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
18091808
return nullptr;
18101809
}
18111810

1812-
// @todo:assign this using magma->GetOldestHistorySeqno(snapshot);
1813-
auto historyStartSeqno = 0;
1811+
auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot);
18141812
logger->info(
18151813
"MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} "
18161814
"KeyIterator:{}",
@@ -1830,13 +1828,16 @@ std::unique_ptr<ByIdScanContext> MagmaKVStore::initByIdScanContext(
18301828
historyStartSeqno);
18311829
}
18321830

1831+
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
1832+
return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot);
1833+
}
1834+
18331835
scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const {
1834-
// @todo use magma's mode
1835-
// return scan(ctx, magma::Magma::SeqIterator::Mode::History);
1836-
return scan(ctx);
1836+
return scan(ctx, magma::Magma::SeqIterator::Mode::History);
18371837
}
18381838

1839-
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
1839+
scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx,
1840+
magma::Magma::SeqIterator::Mode mode) const {
18401841
if (ctx.lastReadSeqno == ctx.maxSeqno) {
18411842
logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}",
18421843
ctx.vbid,
@@ -1849,7 +1850,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const {
18491850
startSeqno = ctx.lastReadSeqno + 1;
18501851
}
18511852
auto& mctx = dynamic_cast<MagmaScanContext&>(ctx);
1852-
for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid();
1853+
for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode);
1854+
mctx.itr->Valid();
18531855
mctx.itr->Next()) {
18541856
Slice keySlice, metaSlice, valSlice;
18551857
uint64_t seqno;
@@ -3722,3 +3724,7 @@ std::pair<Status, uint64_t> MagmaKVStore::getOldestRollbackableHighSeqno(
37223724

37233725
return {status, seqno};
37243726
}
3727+
3728+
void MagmaKVStore::setHistoryRetentionBytes(size_t size) {
3729+
magma->SetHistoryRetentionSize(size);
3730+
}

engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,12 @@ class MagmaKVStore : public KVStore {
568568
std::unique_ptr<TransactionContext> begin(
569569
Vbid vbid, std::unique_ptr<PersistenceCallback> pcb) override;
570570

571+
/**
572+
* Informs magma of how much history must be retained using
573+
* Magma::SetHistoryRetentionTime
574+
*/
575+
void setHistoryRetentionBytes(size_t size) override;
576+
571577
// Magma uses a unique logger with a prefix of magma so that all logging
572578
// calls from the wrapper thru magma will be prefixed with magma.
573579
std::shared_ptr<BucketLogger> logger;
@@ -782,6 +788,9 @@ class MagmaKVStore : public KVStore {
782788
folly::assume_unreachable();
783789
}
784790

791+
scan_error_t scan(BySeqnoScanContext& ctx,
792+
magma::Magma::SeqIterator::Mode mode) const;
793+
785794
MagmaKVStoreConfig& configuration;
786795

787796
/**

engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ class NexusKVStore : public KVStoreIface {
143143
void delSystemEvent(TransactionContext& txnCtx,
144144
const queued_item item) override;
145145
void endTransaction(Vbid vbid) override;
146+
void setHistoryRetentionBytes(size_t size) override {
147+
// not supported on Nexus
148+
}
146149

147150
/**
148151
* Unit test only hook called before we compact the first KVStore. Public as

engines/ep/tests/mock/mock_kvstore.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class MockKVStore : public KVStore {
225225
prepareToRollback,
226226
(Vbid vbid),
227227
(override));
228+
MOCK_METHOD(void, setHistoryRetentionBytes, (size_t size), (override));
228229

229230
/**
230231
* Helper function to replace the existing read-write KVStore in the given

engines/ep/tests/module_tests/collections/collections_dcp_test.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,27 @@ class CollectionsDcpTest : virtual public SingleThreadedKVBucketTest {
103103
const CollectionEntry::Entry& entry,
104104
uint64_t seqno);
105105

106+
/**
107+
* This function (created for OSO tests) creates two collections (fruit
108+
* and vegetable) and calls writeTwoCollections
109+
*
110+
* @param endOnVegetable true and the last item written will be for the
111+
* vegetable collection
112+
* @return current manifest and vbucket (::vbid) high-seqno
113+
*/
114+
std::pair<CollectionsManifest, uint64_t> setupTwoCollections(
115+
bool endOnVegetable = false);
116+
106117
/**
107118
* This function (created for OSO tests) writes to two collections (fruit
108119
* and vegetable). The keys are "a", "b", "c" and "d" to demonstrate the
109120
* lexicographical ordering of an OSO snapshot.
110121
*
111122
* @param endOnVegetable true and the last item written will be for the
112-
* vegetable collection
113-
* @return manifest and high-seqno
123+
* vegetable collection
124+
* @return vbucket (::vbid) high-seqno
114125
*/
115-
std::pair<CollectionsManifest, uint64_t> setupTwoCollections(
116-
bool endOnVegetable = false);
126+
uint64_t writeTwoCollectios(bool endOnTarget);
117127

118128
static cb::engine_errc dcpAddFailoverLog(
119129
const std::vector<vbucket_failover_t>&);

engines/ep/tests/module_tests/collections/collections_kvstore_test.cc

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
9292

9393
void applyEvents(TransactionContext& txnCtx,
9494
VB::Commit& commitData,
95-
const CollectionsManifest& cm) {
95+
const CollectionsManifest& cm,
96+
bool writeEventNow = true) {
9697
manifest.update(*vbucket, makeManifest(cm));
9798

9899
std::vector<queued_item> events;
@@ -101,17 +102,44 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
101102

102103
for (auto& ev : events) {
103104
commitData.collections.recordSystemEvent(*ev);
105+
if (writeEventNow) {
106+
if (ev->isDeleted()) {
107+
kvstore->delSystemEvent(txnCtx, ev);
108+
} else {
109+
kvstore->setSystemEvent(txnCtx, ev);
110+
}
111+
}
112+
}
113+
if (!writeEventNow) {
114+
std::move(events.begin(),
115+
events.end(),
116+
std::back_inserter(allEvents));
117+
}
118+
}
119+
120+
void applyEvents(TransactionContext& txnCtx,
121+
const CollectionsManifest& cm,
122+
bool writeEventNow = true) {
123+
return applyEvents(txnCtx, flush, cm, writeEventNow);
124+
}
125+
126+
// This function is to be used in conjunction with applyEvents when
127+
// writeEventNow=false allowing a test to better emulate the flusher and
128+
// write keys in a sorted batch. Tests can applyEvents so that collection
129+
// metadata management does updates, but defer the system event writing
130+
// until ready to commit
131+
void sortAndWriteAllEvents(TransactionContext& txnCtx) {
132+
std::sort(allEvents.begin(),
133+
allEvents.end(),
134+
OrderItemsForDeDuplication{});
135+
for (auto& ev : allEvents) {
104136
if (ev->isDeleted()) {
105137
kvstore->delSystemEvent(txnCtx, ev);
106138
} else {
107139
kvstore->setSystemEvent(txnCtx, ev);
108140
}
109141
}
110-
}
111-
112-
void applyEvents(TransactionContext& txnCtx,
113-
const CollectionsManifest& cm) {
114-
applyEvents(txnCtx, flush, cm);
142+
allEvents.clear();
115143
}
116144

117145
void checkUid(const Collections::KVStore::Manifest& md,
@@ -224,7 +252,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
224252
VB::Commit commitData(manifest);
225253
auto ctx = kvstore->begin(vbucket->getId(),
226254
std::make_unique<PersistenceCallback>());
227-
applyEvents(*ctx, commitData, cm);
255+
applyEvents(*ctx, commitData, cm, false);
256+
sortAndWriteAllEvents(*ctx);
228257
kvstore->commit(std::move(ctx), commitData);
229258
auto [status, md] = kvstore->getCollectionsManifest(Vbid(0));
230259
EXPECT_TRUE(status);
@@ -240,6 +269,7 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest {
240269
VBucketPtr vbucket;
241270
WriteCallback wc;
242271
DeleteCallback dc;
272+
std::vector<queued_item> allEvents;
243273
};
244274

245275
class CollectionsKVStoreTest
@@ -583,19 +613,21 @@ class CollectionRessurectionKVStoreTest
583613
auto ctx = kvstore->begin(vbucket->getId(),
584614
std::make_unique<PersistenceCallback>());
585615
cm.add(targetScope);
586-
applyEvents(*ctx, cm);
616+
applyEvents(*ctx, cm, false);
587617
cm.add(target, targetScope);
588-
applyEvents(*ctx, cm);
618+
applyEvents(*ctx, cm, false);
619+
sortAndWriteAllEvents(*ctx);
589620
kvstore->commit(std::move(ctx), flush);
590621
}
591622

592623
// runs a flush batch that will leave the target collection in dropped state
593624
void dropScope() {
594625
openScopeOpenCollection();
595-
cm.remove(targetScope);
596626
auto ctx = kvstore->begin(vbucket->getId(),
597627
std::make_unique<PersistenceCallback>());
598-
applyEvents(*ctx, cm);
628+
cm.remove(targetScope);
629+
applyEvents(*ctx, cm, false);
630+
sortAndWriteAllEvents(*ctx);
599631
kvstore->commit(std::move(ctx), flush);
600632
}
601633

@@ -709,9 +741,9 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() {
709741
std::make_unique<PersistenceCallback>());
710742
if (!cm.exists(targetScope)) {
711743
cm.add(targetScope);
712-
applyEvents(*ctx, cm);
744+
applyEvents(*ctx, cm, false);
713745
cm.add(target, targetScope);
714-
applyEvents(*ctx, cm);
746+
applyEvents(*ctx, cm, false);
715747
}
716748

717749
std::string expectedName = target.name;
@@ -720,22 +752,23 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() {
720752
// iterate cycles of remove/add
721753
for (int ii = 0; ii < getCycles(); ii++) {
722754
cm.remove(scope);
723-
applyEvents(*ctx, cm);
724-
755+
applyEvents(*ctx, cm, false);
725756
if (resurectWithNewName()) {
726757
expectedName = target.name + "_" + std::to_string(ii);
727758
scope.name = targetScope.name + "_" + std::to_string(ii);
728759
}
729760
cm.add(scope);
730-
applyEvents(*ctx, cm);
761+
applyEvents(*ctx, cm, false);
731762
cm.add({expectedName, target.uid}, scope);
732-
applyEvents(*ctx, cm);
763+
applyEvents(*ctx, cm, false);
733764
}
734765

735766
if (dropCollectionAtEnd()) {
736767
cm.remove(scope);
737-
applyEvents(*ctx, cm);
768+
applyEvents(*ctx, cm, false);
738769
}
770+
771+
sortAndWriteAllEvents(*ctx);
739772
kvstore->commit(std::move(ctx), flush);
740773

741774
// Now validate

engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,15 @@ class CollectionsOSODcpTest : public CollectionsDcpParameterizedTest {
4545

4646
std::pair<CollectionsManifest, uint64_t>
4747
CollectionsDcpTest::setupTwoCollections(bool endOnTarget) {
48-
VBucketPtr vb = store->getVBucket(vbid);
4948
CollectionsManifest cm(CollectionEntry::fruit);
5049
setCollections(cookie, cm.add(CollectionEntry::vegetable));
50+
flush_vbucket_to_disk(vbid, 2);
51+
return {cm, writeTwoCollectios(endOnTarget)};
52+
}
5153

52-
// Interleave the writes to two collections and then OSO backfill one
54+
uint64_t CollectionsDcpTest::writeTwoCollectios(bool endOnTarget) {
55+
// Interleave the writes to two collections, this is linked to expectations
56+
// in CollectionsOSODcpTest test harness
5357
store_item(vbid, makeStoredDocKey("b", CollectionEntry::fruit), "q");
5458
store_item(vbid, makeStoredDocKey("b", CollectionEntry::vegetable), "q");
5559
store_item(vbid, makeStoredDocKey("d", CollectionEntry::fruit), "a");
@@ -66,8 +70,8 @@ CollectionsDcpTest::setupTwoCollections(bool endOnTarget) {
6670
vbid, makeStoredDocKey("c", CollectionEntry::vegetable), "q");
6771
store_item(vbid, makeStoredDocKey("c", CollectionEntry::fruit), "y");
6872
}
69-
flush_vbucket_to_disk(vbid, 10); // 8 keys + 2 events
70-
return {cm, vb->getHighSeqno()};
73+
flush_vbucket_to_disk(vbid, 8);
74+
return store->getVBucket(vbid)->getHighSeqno();
7175
}
7276

7377
// Run through how we expect OSO to work, this is a minimal test which will
@@ -674,7 +678,6 @@ TEST_P(CollectionsOSODcpTest, MB_43700) {
674678
// snapshots
675679
class CollectionsOSOEphemeralTest : public CollectionsDcpParameterizedTest {
676680
public:
677-
std::pair<CollectionsManifest, uint64_t> setupTwoCollections();
678681
};
679682

680683
// Run through how we expect OSO to work, this is a minimal test which will

0 commit comments

Comments
 (0)