Skip to content

Commit 3df4fca

Browse files
jameseh96daverigby
authored andcommitted
RocksDB: Implement RocksDBKVStore::scan basics
Lays the initial groundwork for iterating all keys by seqno. At the moment, the ColumnFamily mapping seqno=>key will grow forever; we do not ever remove old mappings. This could be done in a compaction filter, or a periodic task etc. The main costs currently are slower scans as time is wasted ignoring seqno=>key mappings where the seqno is no longer current, and ever growing storage usage. Change-Id: I4431a498cb59248756f90d8b7b3469daddb05a21 Reviewed-on: http://review.couchbase.org/82747 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 3ae55df commit 3df4fca

File tree

5 files changed

+142
-80
lines changed

5 files changed

+142
-80
lines changed

engines/ep/src/rocksdb-kvstore/rocksdb-kvstore.cc

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -656,16 +656,133 @@ ScanContext* RocksDBKVStore::initScanContext(
656656
uint64_t startSeqno,
657657
DocumentFilter options,
658658
ValueFilter valOptions) {
659-
// TODO RDB vmx 2016-10-29: implement
660659
size_t scanId = scanCounter++;
660+
// As we cannot efficiently determine how many documents this scan will
661+
// find, we approximate this value with the seqno difference + 1
662+
// as scan is supposed to be inclusive at both ends,
663+
// seqnos 2 to 4 covers 3 docs not 4 - 2 = 2
664+
665+
uint64_t endSeqno = cachedVBStates[vbid]->highSeqno;
661666
return new ScanContext(cb,
662667
cl,
663668
vbid,
664669
scanId,
665670
startSeqno,
666-
99999999,
671+
endSeqno,
667672
options,
668673
valOptions,
669-
999999,
674+
/* documentCount */ endSeqno - startSeqno + 1,
670675
configuration);
671676
}
677+
678+
scan_error_t RocksDBKVStore::scan(ScanContext* ctx) {
679+
if (!ctx) {
680+
return scan_failed;
681+
}
682+
683+
if (ctx->lastReadSeqno == ctx->maxSeqno) {
684+
return scan_success;
685+
}
686+
687+
uint64_t start = ctx->startSeqno;
688+
if (ctx->lastReadSeqno != 0) {
689+
start = ctx->lastReadSeqno + 1;
690+
}
691+
692+
GetMetaOnly isMetaOnly = ctx->valFilter == ValueFilter::KEYS_ONLY
693+
? GetMetaOnly::Yes
694+
: GetMetaOnly::No;
695+
696+
rocksdb::ReadOptions snapshotOpts{rocksdb::ReadOptions()};
697+
snapshotOpts.snapshot = db->GetSnapshot();
698+
699+
std::unique_ptr<rocksdb::Iterator> it(
700+
db->NewIterator(snapshotOpts, seqnoFamilyHandle.get()));
701+
702+
std::string startStr = mkSeqnoStr(ctx->vbid, start);
703+
it->Seek(startStr);
704+
705+
std::string endStr = mkSeqnoStr(ctx->vbid, ctx->maxSeqno);
706+
707+
auto isPastEnd = [&endStr, this](rocksdb::Slice seqSlice) {
708+
return vbidSeqnoComparator.Compare(seqSlice, endStr) == 1;
709+
};
710+
711+
for (; it->Valid() && !isPastEnd(it->key()); it->Next()) {
712+
uint16_t vb;
713+
int64_t seqno;
714+
715+
rocksdb::Slice seqnoSlice = it->key();
716+
grokSeqnoSlice(seqnoSlice, &vb, &seqno);
717+
718+
rocksdb::Slice keySlice = it->value();
719+
720+
std::string valueStr;
721+
rocksdb::Status s = db->Get(snapshotOpts, keySlice, &valueStr);
722+
723+
if (!s.ok()) {
724+
// TODO RDB: Old seqnos are never removed from the db!
725+
// If the item does not exist (s.isNotFound())
726+
// the seqno => key mapping could be removed; not even
727+
// a tombstone remains of that item.
728+
continue;
729+
}
730+
731+
rocksdb::Slice valSlice(valueStr);
732+
733+
// TODO RDB: Deal with collections
734+
DocKey key(reinterpret_cast<const uint8_t*>(keySlice.data() +
735+
sizeof(uint16_t)),
736+
keySlice.size() - sizeof(uint16_t),
737+
DocNamespace::DefaultCollection);
738+
739+
std::unique_ptr<Item> itm =
740+
grokValSlice(ctx->vbid, key, valSlice, isMetaOnly);
741+
742+
if (itm->getBySeqno() > seqno) {
743+
// TODO RDB: Old seqnos are never removed from the db!
744+
// If the item has a newer seqno now, the stale
745+
// seqno => key mapping could be removed
746+
continue;
747+
} else if (itm->getBySeqno() < seqno) {
748+
throw std::logic_error(
749+
"RocksDBKVStore::scan: index has a higher seqno"
750+
"than the document in a snapshot!");
751+
}
752+
753+
bool includeDeletes =
754+
(ctx->docFilter == DocumentFilter::NO_DELETES) ? false : true;
755+
bool onlyKeys =
756+
(ctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
757+
758+
if (!includeDeletes && itm->getOperation() == queue_op::del) {
759+
continue;
760+
}
761+
int64_t byseqno = itm->getBySeqno();
762+
CacheLookup lookup(key, byseqno, ctx->vbid);
763+
ctx->lookup->callback(lookup);
764+
765+
int status = ctx->lookup->getStatus();
766+
767+
if (status == ENGINE_KEY_EEXISTS) {
768+
ctx->lastReadSeqno = byseqno;
769+
continue;
770+
} else if (status == ENGINE_ENOMEM) {
771+
return scan_again;
772+
}
773+
774+
GetValue rv(std::move(itm), ENGINE_SUCCESS, -1, onlyKeys);
775+
ctx->callback->callback(rv);
776+
status = ctx->callback->getStatus();
777+
778+
if (status == ENGINE_ENOMEM) {
779+
return scan_again;
780+
}
781+
782+
ctx->lastReadSeqno = byseqno;
783+
}
784+
785+
cb_assert(it->status().ok()); // Check for any errors found during the scan
786+
787+
return scan_success;
788+
}

engines/ep/src/rocksdb-kvstore/rocksdb-kvstore.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,7 @@ class RocksDBKVStore : public KVStore {
262262
DocumentFilter options,
263263
ValueFilter valOptions) override;
264264

265-
scan_error_t scan(ScanContext* sctx) override {
266-
// TODO vmx 2016-10-29: implement
267-
return scan_success;
268-
}
265+
scan_error_t scan(ScanContext* sctx) override;
269266

270267
void destroyScanContext(ScanContext* ctx) override {
271268
// TODO vmx 2016-10-29: implement

engines/ep/tests/ep_testsuite.cc

Lines changed: 13 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7452,14 +7452,8 @@ BaseTestCase testsuite_testcases[] = {
74527452
cleanup),
74537453
TestCase("test memory condition", test_memory_condition, test_setup,
74547454
teardown, "max_size=2621440", prepare_ep_bucket, cleanup),
7455-
TestCase("warmup conf",
7456-
test_warmup_conf,
7457-
test_setup,
7458-
teardown,
7459-
NULL,
7460-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
7461-
prepare_skip_broken_under_rocks,
7462-
cleanup),
7455+
TestCase("warmup conf", test_warmup_conf, test_setup,
7456+
teardown, NULL, prepare, cleanup),
74637457
TestCase("bloomfilter conf", test_bloomfilter_conf, test_setup,
74647458
teardown, NULL, prepare, cleanup),
74657459
TestCase("test bloomfilters",
@@ -7501,14 +7495,9 @@ BaseTestCase testsuite_testcases[] = {
75017495
cleanup),
75027496
TestCase("test set_param message", test_set_param_message, test_setup,
75037497
teardown, "chk_remover_stime=1;max_size=6291456", prepare, cleanup),
7504-
TestCase("test warmup oom value eviction",
7505-
test_warmup_oom,
7506-
test_setup,
7507-
teardown,
7508-
"item_eviction_policy=full_eviction",
7509-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
7510-
prepare_skip_broken_under_rocks,
7511-
cleanup),
7498+
TestCase("test warmup oom value eviction", test_warmup_oom, test_setup,
7499+
teardown, "item_eviction_policy=full_eviction",
7500+
prepare, cleanup),
75127501

75137502
// Stats tests
75147503
TestCase("item stats", test_item_stats, test_setup, teardown, NULL,
@@ -7669,40 +7658,16 @@ BaseTestCase testsuite_testcases[] = {
76697658
prepare_skip_broken_under_rocks,
76707659
cleanup),
76717660
// special non-Ascii keys
7672-
TestCase("test special char keys",
7673-
test_specialKeys,
7674-
test_setup,
7675-
teardown,
7676-
NULL,
7677-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
7678-
prepare_skip_broken_under_rocks,
7679-
cleanup),
7680-
TestCase("test binary keys",
7681-
test_binKeys,
7682-
test_setup,
7683-
teardown,
7684-
NULL,
7685-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
7686-
prepare_skip_broken_under_rocks,
7687-
cleanup),
7661+
TestCase("test special char keys", test_specialKeys, test_setup,
7662+
teardown, NULL, prepare, cleanup),
7663+
TestCase("test binary keys", test_binKeys, test_setup,
7664+
teardown, NULL, prepare, cleanup),
76887665

76897666
// restart tests
7690-
TestCase("test restart",
7691-
test_restart,
7692-
test_setup,
7693-
teardown,
7694-
NULL,
7695-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
7696-
prepare_skip_broken_under_rocks,
7697-
cleanup),
7698-
TestCase("set+get+restart+hit (bin)",
7699-
test_restart_bin_val,
7700-
test_setup,
7701-
teardown,
7702-
NULL,
7703-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
7704-
prepare_skip_broken_under_rocks,
7705-
cleanup),
7667+
TestCase("test restart", test_restart, test_setup,
7668+
teardown, NULL, prepare, cleanup),
7669+
TestCase("set+get+restart+hit (bin)", test_restart_bin_val,
7670+
test_setup, teardown, NULL, prepare, cleanup),
77067671
TestCase("flush+restart",
77077672
test_flush_restart,
77087673
test_setup,

engines/ep/tests/ep_testsuite_basic.cc

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,14 +2293,8 @@ BaseTestCase testsuite_testcases[] = {
22932293
teardown, NULL, prepare, cleanup),
22942294
TestCase("set/delete (invalid cas)", test_set_delete_invalid_cas,
22952295
test_setup, teardown, NULL, prepare, cleanup),
2296-
TestCase("delete/set/delete",
2297-
test_delete_set,
2298-
test_setup,
2299-
teardown,
2300-
NULL,
2301-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
2302-
prepare_skip_broken_under_rocks,
2303-
cleanup),
2296+
TestCase("delete/set/delete", test_delete_set, test_setup,
2297+
teardown, NULL, prepare, cleanup),
23042298
TestCase("get/delete with missing db file",
23052299
test_get_delete_missing_file,
23062300
test_setup,

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5989,12 +5989,9 @@ BaseTestCase testsuite_testcases[] = {
59895989
TestCase("test dcp consumer noop", test_dcp_consumer_noop, test_setup,
59905990
teardown, nullptr, prepare, cleanup),
59915991
TestCase("test dcp replica stream backfill",
5992-
test_dcp_replica_stream_backfill,
5993-
test_setup,
5994-
teardown,
5992+
test_dcp_replica_stream_backfill, test_setup, teardown,
59955993
"chk_remover_stime=1;max_checkpoints=2",
5996-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
5997-
prepare_skip_broken_under_rocks,
5994+
prepare,
59985995
cleanup),
59995996
TestCase("test dcp replica stream in-memory",
60005997
test_dcp_replica_stream_in_memory, test_setup, teardown,
@@ -6022,15 +6019,12 @@ BaseTestCase testsuite_testcases[] = {
60226019
prepare_skip_broken_under_rocks,
60236020
cleanup),
60246021
TestCase("test producer stream request (partial)",
6025-
test_dcp_producer_stream_req_partial,
6026-
test_setup,
6027-
teardown,
6022+
test_dcp_producer_stream_req_partial, test_setup, teardown,
60286023
/* set chk_period to essentially infinity so it won't run
60296024
during this test and create extra checkpoints we don't want.*/
60306025
"chk_remover_stime=1;chk_max_items=100;"
60316026
"chk_period=1000000",
6032-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
6033-
prepare_skip_broken_under_rocks,
6027+
prepare,
60346028
cleanup),
60356029
TestCase("test producer stream request (full merged snapshots)",
60366030
test_dcp_producer_stream_req_full_merged_snapshots,
@@ -6382,13 +6376,8 @@ BaseTestCase testsuite_testcases[] = {
63826376
test_set_dcp_param, test_setup, teardown, NULL,
63836377
prepare, cleanup),
63846378
TestCase("test MB-23863 backfill deleted value",
6385-
test_dcp_producer_deleted_item_backfill,
6386-
test_setup,
6387-
teardown,
6388-
NULL,
6389-
/* TODO RDB: Enable after implementing RocksDBKVStore::scan */
6390-
prepare_ep_bucket_skip_broken_under_rocks,
6391-
cleanup),
6379+
test_dcp_producer_deleted_item_backfill, test_setup, teardown,
6380+
NULL, prepare_ep_bucket, cleanup),
63926381
TestCase("test noop mandatory",test_dcp_noop_mandatory,
63936382
test_setup, teardown, NULL, prepare, cleanup),
63946383

0 commit comments

Comments
 (0)