Skip to content

Commit ea8dcdd

Browse files
committed
MB-34173: Merge 'couchbase/vulcan' into 'couchbase/alice'
* couchbase/vulcan: MB-34173: 2/2 Check for illegal snapshot in vbstate document MB-34173: 1/2 Persist a valid snapshot range during backfill Change-Id: I67a557b33832f64ff9bafa301f545ac38c83316a
2 parents 55dedbb + 5e9fca9 commit ea8dcdd

File tree

13 files changed

+411
-108
lines changed

13 files changed

+411
-108
lines changed

engines/ep/src/checkpoint.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,16 +1272,15 @@ CheckpointManager::ItemsForCursor CheckpointManager::getItemsForCursor(
12721272
"manager on vb:%" PRIu16,
12731273
name.c_str(),
12741274
vbucketId);
1275-
return {};
1275+
return {0, 0};
12761276
}
12771277

12781278
auto& cursor = it->second;
12791279

12801280
// Fetch whole checkpoints; as long as we don't exceed the approx item
12811281
// limit.
1282-
ItemsForCursor result;
1283-
result.range.start = (*cursor.currentCheckpoint)->getSnapshotStartSeqno();
1284-
result.range.end = (*cursor.currentCheckpoint)->getSnapshotEndSeqno();
1282+
ItemsForCursor result((*cursor.currentCheckpoint)->getSnapshotStartSeqno(),
1283+
(*cursor.currentCheckpoint)->getSnapshotEndSeqno());
12851284
size_t itemCount = 0;
12861285
while ((result.moreAvailable = incrCursor(cursor))) {
12871286
queued_item& qi = *(cursor.currentPos);
@@ -1627,10 +1626,9 @@ snapshot_info_t CheckpointManager::getSnapshotInfo() {
16271626
"checkpointList is empty");
16281627
}
16291628

1630-
snapshot_info_t info;
1631-
info.range.start = checkpointList.back()->getSnapshotStartSeqno();
1632-
info.start = lastBySeqno;
1633-
info.range.end = checkpointList.back()->getSnapshotEndSeqno();
1629+
snapshot_info_t info(lastBySeqno,
1630+
{checkpointList.back()->getSnapshotStartSeqno(),
1631+
checkpointList.back()->getSnapshotEndSeqno()});
16341632

16351633
// If there are no items in the open checkpoint then we need to resume by
16361634
// using that sequence numbers of the last closed snapshot. The exception is

engines/ep/src/checkpoint.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,9 @@ class CheckpointManager {
628628

629629
/// Return type of getItemsForCursor()
630630
struct ItemsForCursor {
631-
snapshot_range_t range = {0, 0};
631+
ItemsForCursor(uint64_t start, uint64_t end) : range(start, end) {
632+
}
633+
snapshot_range_t range;
632634
bool moreAvailable = {false};
633635
};
634636

engines/ep/src/couch-kvstore/couch-kvstore.cc

Lines changed: 142 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -346,9 +346,10 @@ void CouchKVStore::initialize() {
346346
DbHolder db(*this);
347347
errorCode = openDB(id, db, COUCHSTORE_OPEN_FLAG_RDONLY);
348348
if (errorCode == COUCHSTORE_SUCCESS) {
349-
readVBState(db, id);
350-
/* update stat */
351-
++st.numLoadedVb;
349+
if (readVBState(db, id) == ReadVBStateStatus::Success) {
350+
/* update stat */
351+
++st.numLoadedVb;
352+
}
352353
} else {
353354
logger.log(EXTENSION_LOG_WARNING,
354355
"CouchKVStore::initialize: openDB"
@@ -2157,114 +2158,161 @@ void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
21572158
}
21582159
}
21592160

2160-
ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
2161+
std::pair<CouchKVStore::ReadVBStateStatus, snapshot_info_t>
2162+
CouchKVStore::processVbstateSnapshot(uint16_t vb,
2163+
vbucket_state_t state,
2164+
int64_t version,
2165+
const std::string& snapStart,
2166+
const std::string& snapEnd,
2167+
uint64_t highSeqno) {
2168+
ReadVBStateStatus status = ReadVBStateStatus::Success;
2169+
2170+
// All upgrade paths we now expect start and end
2171+
uint64_t start = 0, end = 0;
2172+
if (!parseUint64(snapStart.c_str(), &start) ||
2173+
!parseUint64(snapEnd.c_str(), &end)) {
2174+
start = end = uint64_t(highSeqno);
2175+
} else if (!(highSeqno >= start && highSeqno <= end)) {
2176+
// very likely MB-34173, log this occurrence.
2177+
// log the state, range and version
2178+
logger.log(EXTENSION_LOG_WARNING,
2179+
"CouchKVStore::processVbstateSnapshot vb:%" PRIu16
2180+
" %s with invalid snapshot range. Found version:%" PRId64
2181+
", "
2182+
"highSeqno:%" PRIu64 ", start:%" PRIu64 ", end:%" PRIu64,
2183+
vb,
2184+
VBucket::toString(state),
2185+
version,
2186+
highSeqno,
2187+
start,
2188+
end);
2189+
2190+
if (state == vbucket_state_active) {
2191+
// Reset the snapshot range to match what the flusher would
2192+
// normally set, that is start and end equal the high-seqno
2193+
start = end = highSeqno;
2194+
} else {
2195+
// Flag that the VB is corrupt, it needs rebuilding
2196+
status = ReadVBStateStatus::CorruptSnapshot;
2197+
start = 0, end = 0;
2198+
}
2199+
}
2200+
2201+
return {status, snapshot_info_t{highSeqno, {start, end}}};
2202+
}
2203+
2204+
CouchKVStore::ReadVBStateStatus CouchKVStore::readVBState(Db* db,
2205+
uint16_t vbId) {
21612206
sized_buf id;
21622207
LocalDoc *ldoc = NULL;
2163-
couchstore_error_t errCode = COUCHSTORE_SUCCESS;
2208+
ReadVBStateStatus status = ReadVBStateStatus::Success;
21642209
vbucket_state_t state = vbucket_state_dead;
21652210
uint64_t checkpointId = 0;
21662211
uint64_t maxDeletedSeqno = 0;
21672212
int64_t highSeqno = 0;
21682213
std::string failovers;
21692214
uint64_t purgeSeqno = 0;
2170-
uint64_t lastSnapStart = 0;
2171-
uint64_t lastSnapEnd = 0;
2215+
snapshot_info_t snapshot{0, {0, 0}};
21722216
uint64_t maxCas = 0;
21732217
int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
21742218
bool mightContainXattrs = false;
21752219

21762220
DbInfo info;
2177-
errCode = couchstore_db_info(db, &info);
2178-
if (errCode == COUCHSTORE_SUCCESS) {
2179-
highSeqno = info.last_sequence;
2180-
purgeSeqno = info.purge_seq;
2181-
} else {
2221+
auto couchStoreStatus = couchstore_db_info(db, &info);
2222+
if (couchStoreStatus != COUCHSTORE_SUCCESS) {
21822223
logger.log(EXTENSION_LOG_WARNING,
21832224
"CouchKVStore::readVBState: couchstore_db_info error:%s"
2184-
", vb:%" PRIu16, couchstore_strerror(errCode), vbId);
2185-
return couchErr2EngineErr(errCode);
2225+
", vb:%" PRIu16,
2226+
couchstore_strerror(couchStoreStatus),
2227+
vbId);
2228+
return ReadVBStateStatus::CouchstoreError;
21862229
}
21872230

2231+
highSeqno = info.last_sequence;
2232+
purgeSeqno = info.purge_seq;
2233+
21882234
id.buf = (char *)"_local/vbstate";
21892235
id.size = sizeof("_local/vbstate") - 1;
2190-
errCode = couchstore_open_local_document(db, (void *)id.buf,
2191-
id.size, &ldoc);
2192-
if (errCode != COUCHSTORE_SUCCESS) {
2193-
if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2194-
logger.log(EXTENSION_LOG_NOTICE,
2195-
"CouchKVStore::readVBState: '_local/vbstate' not found "
2196-
"for vb:%d", vbId);
2197-
} else {
2198-
logger.log(EXTENSION_LOG_WARNING,
2199-
"CouchKVStore::readVBState: couchstore_open_local_document"
2200-
" error:%s, vb:%" PRIu16, couchstore_strerror(errCode),
2201-
vbId);
2202-
}
2203-
} else {
2236+
couchStoreStatus =
2237+
couchstore_open_local_document(db, (void*)id.buf, id.size, &ldoc);
2238+
if (couchStoreStatus == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2239+
logger.log(EXTENSION_LOG_NOTICE,
2240+
"CouchKVStore::readVBState: '_local/vbstate' not found "
2241+
"for vb:%d",
2242+
vbId);
2243+
} else if (couchStoreStatus != COUCHSTORE_SUCCESS) {
2244+
logger.log(EXTENSION_LOG_WARNING,
2245+
"CouchKVStore::readVBState: couchstore_open_local_document"
2246+
" error:%s, vb:%" PRIu16,
2247+
couchstore_strerror(couchStoreStatus),
2248+
vbId);
2249+
return ReadVBStateStatus::CouchstoreError;
2250+
}
2251+
2252+
if (couchStoreStatus == COUCHSTORE_SUCCESS) {
22042253
const std::string statjson(ldoc->json.buf, ldoc->json.size);
2205-
cJSON *jsonObj = cJSON_Parse(statjson.c_str());
2254+
cJSON* jsonObj = cJSON_Parse(statjson.c_str());
22062255
if (!jsonObj) {
22072256
couchstore_free_local_document(ldoc);
2208-
logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: Failed to "
2257+
logger.log(EXTENSION_LOG_WARNING,
2258+
"CouchKVStore::readVBState: Failed to "
22092259
"parse the vbstat json doc for vb:%" PRIu16 ", json:%s",
2210-
vbId , statjson.c_str());
2211-
return couchErr2EngineErr(errCode);
2260+
vbId,
2261+
statjson.c_str());
2262+
return ReadVBStateStatus::JsonInvalid;
22122263
}
22132264

2214-
const std::string vb_state = getJSONObjString(
2215-
cJSON_GetObjectItem(jsonObj, "state"));
2216-
const std::string checkpoint_id = getJSONObjString(
2217-
cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
2265+
const auto version =
2266+
getJSONObjInt64(cJSON_GetObjectItem(jsonObj, "version"));
2267+
const std::string vb_state =
2268+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "state"));
2269+
const std::string checkpoint_id =
2270+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id"));
22182271
const std::string max_deleted_seqno = getJSONObjString(
2219-
cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
2220-
const std::string snapStart = getJSONObjString(
2221-
cJSON_GetObjectItem(jsonObj, "snap_start"));
2222-
const std::string snapEnd = getJSONObjString(
2223-
cJSON_GetObjectItem(jsonObj, "snap_end"));
2224-
const std::string maxCasValue = getJSONObjString(
2225-
cJSON_GetObjectItem(jsonObj, "max_cas"));
2272+
cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
2273+
const std::string snapStart =
2274+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start"));
2275+
const std::string snapEnd =
2276+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end"));
2277+
const std::string maxCasValue =
2278+
getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas"));
22262279
const std::string hlcCasEpoch =
22272280
getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
22282281
mightContainXattrs = getJSONObjBool(
22292282
cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
22302283

2231-
cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
2232-
if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
2233-
|| max_deleted_seqno.compare("") == 0) {
2234-
logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: State"
2235-
" JSON doc for vb:%" PRIu16 " is in the wrong format:%s, "
2284+
cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
2285+
if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 ||
2286+
max_deleted_seqno.compare("") == 0) {
2287+
logger.log(EXTENSION_LOG_WARNING,
2288+
"CouchKVStore::readVBState: State"
2289+
" JSON doc for vb:%" PRIu16
2290+
" is in the wrong format:%s, "
22362291
"vb state:%s, checkpoint id:%s and max deleted seqno:%s",
2237-
vbId, statjson.c_str(), vb_state.c_str(),
2238-
checkpoint_id.c_str(), max_deleted_seqno.c_str());
2292+
vbId,
2293+
statjson.c_str(),
2294+
vb_state.c_str(),
2295+
checkpoint_id.c_str(),
2296+
max_deleted_seqno.c_str());
22392297
} else {
22402298
state = VBucket::fromString(vb_state.c_str());
22412299
parseUint64(max_deleted_seqno.c_str(), &maxDeletedSeqno);
22422300
parseUint64(checkpoint_id.c_str(), &checkpointId);
22432301

2244-
if (snapStart.compare("") == 0) {
2245-
lastSnapStart = highSeqno;
2246-
} else {
2247-
parseUint64(snapStart.c_str(), &lastSnapStart);
2248-
}
2249-
2250-
if (snapEnd.compare("") == 0) {
2251-
lastSnapEnd = highSeqno;
2252-
} else {
2253-
parseUint64(snapEnd.c_str(), &lastSnapEnd);
2254-
}
2255-
22562302
if (maxCasValue.compare("") != 0) {
22572303
parseUint64(maxCasValue.c_str(), &maxCas);
22582304

2259-
// MB-17517: If the maxCas on disk was invalid then don't use it -
2260-
// instead rebuild from the items we load from disk (i.e. as per
2261-
// an upgrade from an earlier version).
2305+
// MB-17517: If the maxCas on disk was invalid then don't use it
2306+
// - instead rebuild from the items we load from disk (i.e. as
2307+
// per an upgrade from an earlier version).
22622308
if (maxCas == static_cast<uint64_t>(-1)) {
22632309
logger.log(EXTENSION_LOG_WARNING,
22642310
"CouchKVStore::readVBState: Invalid max_cas "
22652311
"(0x%" PRIx64 ") read from '%s' for vb:%" PRIu16
22662312
". Resetting max_cas to zero.",
2267-
maxCas, id.buf, vbId);
2313+
maxCas,
2314+
id.buf,
2315+
vbId);
22682316
maxCas = 0;
22692317
}
22702318
}
@@ -2276,24 +2324,36 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
22762324
if (failover_json) {
22772325
failovers = to_string(failover_json, false);
22782326
}
2327+
2328+
std::tie(status, snapshot) =
2329+
processVbstateSnapshot(vbId,
2330+
state,
2331+
version,
2332+
snapStart,
2333+
snapEnd,
2334+
uint64_t(highSeqno));
22792335
}
2336+
22802337
cJSON_Delete(jsonObj);
22812338
couchstore_free_local_document(ldoc);
22822339
}
22832340

2284-
cachedVBStates[vbId] = std::make_unique<vbucket_state>(state,
2285-
checkpointId,
2286-
maxDeletedSeqno,
2287-
highSeqno,
2288-
purgeSeqno,
2289-
lastSnapStart,
2290-
lastSnapEnd,
2291-
maxCas,
2292-
hlcCasEpochSeqno,
2293-
mightContainXattrs,
2294-
failovers);
2295-
2296-
return couchErr2EngineErr(errCode);
2341+
// If snapshot is not initialised do not create a vbucket state
2342+
if (status == ReadVBStateStatus::Success) {
2343+
cachedVBStates[vbId] =
2344+
std::make_unique<vbucket_state>(state,
2345+
checkpointId,
2346+
maxDeletedSeqno,
2347+
snapshot.start,
2348+
purgeSeqno,
2349+
snapshot.range.start,
2350+
snapshot.range.end,
2351+
maxCas,
2352+
hlcCasEpochSeqno,
2353+
mightContainXattrs,
2354+
failovers);
2355+
}
2356+
return status;
22972357
}
22982358

22992359
couchstore_error_t CouchKVStore::saveVBState(Db *db,
@@ -2309,7 +2369,8 @@ couchstore_error_t CouchKVStore::saveVBState(Db *db,
23092369
jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
23102370
<< ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
23112371
<< ",\"max_cas\": \"" << vbState.maxCas << "\""
2312-
<< ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\"";
2372+
<< ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\""
2373+
<< ",\"version\": 2";
23132374

23142375
if (vbState.mightContainXattrs) {
23152376
jsonState << ",\"might_contain_xattrs\": true";
@@ -2718,7 +2779,7 @@ RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
27182779
return RollbackResult(false, 0, 0, 0);
27192780
}
27202781

2721-
if (readVBState(newdb, vbid) != ENGINE_SUCCESS) {
2782+
if (readVBState(newdb, vbid) != ReadVBStateStatus::Success) {
27222783
return RollbackResult(false, 0, 0, 0);
27232784
}
27242785
cachedDeleteCount[vbid] = info.deleted_count;

engines/ep/src/couch-kvstore/couch-kvstore.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,27 @@ class CouchKVStore : public KVStore
410410
static int recordDbDump(Db *db, DocInfo *docinfo, void *ctx);
411411
static int recordDbStat(Db *db, DocInfo *docinfo, void *ctx);
412412
static int getMultiCb(Db *db, DocInfo *docinfo, void *ctx);
413-
ENGINE_ERROR_CODE readVBState(Db *db, uint16_t vbId);
413+
414+
enum class ReadVBStateStatus {
415+
Success,
416+
JsonInvalid,
417+
CorruptSnapshot,
418+
CouchstoreError
419+
};
420+
421+
ReadVBStateStatus readVBState(Db* db, uint16_t vbId);
422+
423+
/**
424+
* Process the vbstate snapshot strings which are stored in the vbstate
425+
* document. Check for validity and return a status + decoded snapshot.
426+
*/
427+
std::pair<ReadVBStateStatus, snapshot_info_t> processVbstateSnapshot(
428+
uint16_t vb,
429+
vbucket_state_t state,
430+
int64_t version,
431+
const std::string& snapStart,
432+
const std::string& snapEnd,
433+
uint64_t highSeqno);
414434

415435
couchstore_error_t fetchDoc(Db* db,
416436
DocInfo* docinfo,

0 commit comments

Comments
 (0)