Skip to content

Commit 3ae55df

Browse files
jameseh96daverigby
authored andcommitted
RocksDB: Add basic save and read of _local/vbstate
Largely copied from CouchKVStore - ripe for refactoring. Change-Id: I72e74af9337475b16742db05b69ce7911e64000b Reviewed-on: http://review.couchbase.org/82746 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 9279241 commit 3ae55df

File tree

5 files changed

+259
-57
lines changed

5 files changed

+259
-57
lines changed

engines/ep/src/rocksdb-kvstore/rocksdb-kvstore.cc

Lines changed: 231 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,18 @@
2121

2222
#include <string.h>
2323
#include <algorithm>
24+
#include <limits>
25+
26+
#include "vbucket.h"
2427

2528
static const size_t DEFAULT_VAL_SIZE(64 * 1024);
2629

2730
RocksDBKVStore::RocksDBKVStore(KVStoreConfig& config)
28-
: KVStore(config), valBuffer(NULL), valSize(0), scanCounter(0) {
31+
: KVStore(config),
32+
valBuffer(NULL),
33+
valSize(0),
34+
scanCounter(0),
35+
logger(config.getLogger()) {
2936
cachedVBStates.reserve(configuration.getMaxVBuckets());
3037
cachedVBStates.assign(configuration.getMaxVBuckets(), nullptr);
3138

@@ -65,7 +72,8 @@ void RocksDBKVStore::open() {
6572
rocksdb::ColumnFamilyOptions()),
6673

6774
rocksdb::ColumnFamilyDescriptor("vbid_seqno_to_key",
68-
seqnoCFOptions)};
75+
seqnoCFOptions),
76+
rocksdb::ColumnFamilyDescriptor("_local", localCFOptions)};
6977

7078
std::vector<rocksdb::ColumnFamilyHandle*> handles;
7179

@@ -83,12 +91,23 @@ void RocksDBKVStore::open() {
8391

8492
defaultFamilyHandle.reset(handles[0]);
8593
seqnoFamilyHandle.reset(handles[1]);
94+
localFamilyHandle.reset(handles[2]);
95+
96+
// Attempt to read persisted vb states
97+
std::unique_ptr<rocksdb::Iterator> it(
98+
db->NewIterator(rocksdb::ReadOptions(), localFamilyHandle.get()));
99+
for (it->SeekToFirst(); it->Valid(); it->Next()) {
100+
uint16_t vb = std::stoi(it->key().ToString().substr(
101+
getVbstatePrefix().length(), std::string::npos));
102+
readVBState(vb);
103+
}
86104
}
87105

88106
void RocksDBKVStore::close() {
89107
batch.reset();
90108
defaultFamilyHandle.reset();
91109
seqnoFamilyHandle.reset();
110+
localFamilyHandle.reset();
92111
db.reset();
93112
}
94113

@@ -132,10 +151,7 @@ void RocksDBKVStore::adjustValBuffer(const size_t to) {
132151
}
133152

134153
std::vector<vbucket_state*> RocksDBKVStore::listPersistedVbuckets() {
135-
// TODO RDB: Something useful.
136-
// std::map<std::pair<uint16_t, uint16_t>, vbucket_state> rv;
137-
std::vector<vbucket_state*> rv;
138-
return rv;
154+
return cachedVBStates;
139155
}
140156

141157
void RocksDBKVStore::set(const Item& itm, Callback<mutation_result>& cb) {
@@ -254,7 +270,32 @@ void RocksDBKVStore::delVBucket(uint16_t vb, uint64_t vb_version) {
254270
bool RocksDBKVStore::snapshotVBucket(uint16_t vbucketId,
255271
const vbucket_state& vbstate,
256272
VBStatePersist options) {
257-
// TODO RDB: Implement
273+
// TODO RDB: Refactor out behaviour common to this and CouchKVStore
274+
auto start = ProcessClock::now();
275+
276+
if (updateCachedVBState(vbucketId, vbstate) &&
277+
(options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
278+
options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) {
279+
if (!saveVBState(vbstate, vbucketId)) {
280+
logger.log(EXTENSION_LOG_WARNING,
281+
"RocksDBKVStore::snapshotVBucket: saveVBState failed "
282+
"state:%s, vb:%" PRIu16,
283+
VBucket::toString(vbstate.state),
284+
vbucketId);
285+
return false;
286+
}
287+
}
288+
289+
LOG(EXTENSION_LOG_DEBUG,
290+
"RocksDBKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16
291+
" state:%s",
292+
vbucketId,
293+
vbstate.toJSON().c_str());
294+
295+
st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
296+
ProcessClock::now() - start)
297+
.count());
298+
258299
return true;
259300
}
260301

@@ -425,6 +466,189 @@ GetValue RocksDBKVStore::makeGetValue(uint16_t vb,
425466
grokValSlice(vb, key, sval, getMetaOnly), ENGINE_SUCCESS, -1, 0);
426467
}
427468

469+
void RocksDBKVStore::readVBState(uint16_t vbid) {
470+
// Largely copied from CouchKVStore
471+
// TODO RDB: refactor out sections common to CouchKVStore
472+
vbucket_state_t state = vbucket_state_dead;
473+
uint64_t checkpointId = 0;
474+
uint64_t maxDeletedSeqno = 0;
475+
int64_t highSeqno = readHighSeqnoFromDisk(vbid);
476+
std::string failovers;
477+
uint64_t purgeSeqno = 0;
478+
uint64_t lastSnapStart = 0;
479+
uint64_t lastSnapEnd = 0;
480+
uint64_t maxCas = 0;
481+
int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
482+
bool mightContainXattrs = false;
483+
484+
std::string lDocKey = getVbstatePrefix() + std::to_string(vbid);
485+
std::string statjson;
486+
487+
rocksdb::Status s = db->Get(rocksdb::ReadOptions(),
488+
localFamilyHandle.get(),
489+
lDocKey,
490+
&statjson);
491+
492+
if (!s.ok()) {
493+
if (s.IsNotFound()) {
494+
logger.log(EXTENSION_LOG_NOTICE,
495+
"RocksDBKVStore::readVBState: '_local/vbstate.%" PRIu16
496+
"' not found",
497+
vbid);
498+
} else {
499+
logger.log(EXTENSION_LOG_WARNING,
500+
"RocksDBKVStore::readVBState: error getting vbstate"
501+
" error:%s, vb:%" PRIu16,
502+
s.getState(),
503+
vbid);
504+
}
505+
} else {
506+
cJSON* jsonObj = cJSON_Parse(statjson.c_str());
507+
if (!jsonObj) {
508+
logger.log(EXTENSION_LOG_WARNING,
509+
"RocksKVStore::readVBState: Failed to "
510+
"parse the vbstat json doc for vb:%" PRIu16 ", json:%s",
511+
vbid,
512+
statjson.c_str());
513+
}
514+
515+
const std::string vb_state =
516+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "state"));
517+
const std::string checkpoint_id =
518+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id"));
519+
const std::string max_deleted_seqno = getJSONObjString(
520+
cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
521+
const std::string snapStart =
522+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start"));
523+
const std::string snapEnd =
524+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end"));
525+
const std::string maxCasValue =
526+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas"));
527+
const std::string hlcCasEpoch =
528+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
529+
mightContainXattrs = getJSONObjBool(
530+
cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
531+
532+
cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
533+
if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 ||
534+
max_deleted_seqno.compare("") == 0) {
535+
logger.log(EXTENSION_LOG_WARNING,
536+
"RocksDBKVStore::readVBState: State"
537+
" JSON doc for vb:%" PRIu16
538+
" is in the wrong format:%s, "
539+
"vb state:%s, checkpoint id:%s and max deleted seqno:%s",
540+
vbid,
541+
statjson.c_str(),
542+
vb_state.c_str(),
543+
checkpoint_id.c_str(),
544+
max_deleted_seqno.c_str());
545+
} else {
546+
state = VBucket::fromString(vb_state.c_str());
547+
maxDeletedSeqno = std::stoull(max_deleted_seqno);
548+
checkpointId = std::stoull(checkpoint_id);
549+
550+
if (snapStart.compare("") == 0) {
551+
lastSnapStart = highSeqno;
552+
} else {
553+
lastSnapStart = std::stoull(snapStart.c_str());
554+
}
555+
556+
if (snapEnd.compare("") == 0) {
557+
lastSnapEnd = highSeqno;
558+
} else {
559+
lastSnapEnd = std::stoull(snapEnd.c_str());
560+
}
561+
562+
if (maxCasValue.compare("") != 0) {
563+
maxCas = std::stoull(maxCasValue.c_str());
564+
}
565+
566+
if (!hlcCasEpoch.empty()) {
567+
hlcCasEpochSeqno = std::stoull(hlcCasEpoch);
568+
}
569+
570+
if (failover_json) {
571+
char* json = cJSON_PrintUnformatted(failover_json);
572+
failovers.assign(json);
573+
cJSON_Free(json);
574+
}
575+
}
576+
cJSON_Delete(jsonObj);
577+
}
578+
579+
delete cachedVBStates[vbid];
580+
cachedVBStates[vbid] = new vbucket_state(state,
581+
checkpointId,
582+
maxDeletedSeqno,
583+
highSeqno,
584+
purgeSeqno,
585+
lastSnapStart,
586+
lastSnapEnd,
587+
maxCas,
588+
hlcCasEpochSeqno,
589+
mightContainXattrs,
590+
failovers);
591+
}
592+
593+
bool RocksDBKVStore::saveVBState(const vbucket_state& vbState, uint16_t vbid) {
594+
std::stringstream jsonState;
595+
596+
jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
597+
<< ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
598+
<< ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno
599+
<< "\"";
600+
if (!vbState.failovers.empty()) {
601+
jsonState << ",\"failover_table\": " << vbState.failovers;
602+
}
603+
jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
604+
<< ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
605+
<< ",\"max_cas\": \"" << vbState.maxCas << "\""
606+
<< ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\"";
607+
608+
if (vbState.mightContainXattrs) {
609+
jsonState << ",\"might_contain_xattrs\": true";
610+
} else {
611+
jsonState << ",\"might_contain_xattrs\": false";
612+
}
613+
614+
jsonState << "}";
615+
616+
std::string lDocKey = getVbstatePrefix() + std::to_string(vbid);
617+
618+
rocksdb::Status s = db->Put(writeOptions,
619+
localFamilyHandle.get(),
620+
rocksdb::Slice(lDocKey),
621+
rocksdb::Slice(jsonState.str()));
622+
return s.ok();
623+
}
624+
625+
int64_t RocksDBKVStore::readHighSeqnoFromDisk(uint16_t vbid) {
626+
rocksdb::Iterator* it =
627+
db->NewIterator(rocksdb::ReadOptions(), seqnoFamilyHandle.get());
628+
629+
// Seek to the highest seqno=>key mapping stored for the vbid
630+
std::string start = mkSeqnoStr(vbid, std::numeric_limits<int64_t>::max());
631+
it->SeekForPrev(start);
632+
633+
if (it->Valid()) {
634+
uint16_t vb;
635+
int64_t seqno;
636+
637+
rocksdb::Slice seqnoSlice = it->key();
638+
grokSeqnoSlice(seqnoSlice, &vb, &seqno);
639+
640+
if (vb == vbid) {
641+
return seqno;
642+
}
643+
}
644+
645+
return 0;
646+
}
647+
648+
std::string RocksDBKVStore::getVbstatePrefix() {
649+
return "vbstate.";
650+
}
651+
428652
ScanContext* RocksDBKVStore::initScanContext(
429653
std::shared_ptr<Callback<GetValue> > cb,
430654
std::shared_ptr<Callback<CacheLookup> > cl,

engines/ep/src/rocksdb-kvstore/rocksdb-kvstore.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,15 +304,17 @@ class RocksDBKVStore : public KVStore {
304304
*/
305305
std::unique_ptr<rocksdb::DB> db;
306306

307+
VbidSeqnoComparator vbidSeqnoComparator;
307308
std::unique_ptr<rocksdb::ColumnFamilyHandle> defaultFamilyHandle;
308309
std::unique_ptr<rocksdb::ColumnFamilyHandle> seqnoFamilyHandle;
309-
VbidSeqnoComparator vbidSeqnoComparator;
310+
std::unique_ptr<rocksdb::ColumnFamilyHandle> localFamilyHandle;
310311

311312
char* valBuffer;
312313
size_t valSize;
313314

314315
rocksdb::Options rdbOptions;
315316
rocksdb::ColumnFamilyOptions seqnoCFOptions;
317+
rocksdb::ColumnFamilyOptions localCFOptions;
316318

317319
void open();
318320

@@ -341,6 +343,14 @@ class RocksDBKVStore : public KVStore {
341343

342344
void storeItem(const Item& item);
343345

346+
void readVBState(uint16_t vbid);
347+
348+
bool saveVBState(const vbucket_state& vbState, uint16_t vbid);
349+
350+
int64_t readHighSeqnoFromDisk(uint16_t vbid);
351+
352+
std::string getVbstatePrefix();
353+
344354
std::unique_ptr<rocksdb::WriteBatch> batch;
345355
rocksdb::WriteOptions writeOptions;
346356

@@ -350,4 +360,6 @@ class RocksDBKVStore : public KVStore {
350360
std::mutex writeLock;
351361

352362
std::atomic<size_t> scanCounter; // atomic counter for generating scan id
363+
364+
Logger& logger;
353365
};

engines/ep/src/warmup.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,10 +1330,9 @@ uint16_t Warmup::getNumKVStores()
13301330
return 1;
13311331
} else if (config.getBackend().compare("forestdb") == 0) {
13321332
return config.getMaxNumShards();
1333+
} else if (config.getBackend().compare("rocksdb") == 0) {
1334+
return config.getMaxNumShards();
13331335
}
1334-
// TODO vmx 2016-11-10: I guess there must be added something sensible
1335-
// for the leveldb backend here.
1336-
13371336
return 0;
13381337
}
13391338

0 commit comments

Comments
 (0)