@@ -346,9 +346,10 @@ void CouchKVStore::initialize() {
346346 DbHolder db (*this );
347347 errorCode = openDB (id, db, COUCHSTORE_OPEN_FLAG_RDONLY);
348348 if (errorCode == COUCHSTORE_SUCCESS) {
349- readVBState (db, id);
350- /* update stat */
351- ++st.numLoadedVb ;
349+ if (readVBState (db, id) == ReadVBStateStatus::Success) {
350+ /* update stat */
351+ ++st.numLoadedVb ;
352+ }
352353 } else {
353354 logger.log (EXTENSION_LOG_WARNING,
354355 " CouchKVStore::initialize: openDB"
@@ -2157,114 +2158,161 @@ void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
21572158 }
21582159}
21592160
2160- ENGINE_ERROR_CODE CouchKVStore::readVBState (Db *db, uint16_t vbId) {
2161+ std::pair<CouchKVStore::ReadVBStateStatus, snapshot_info_t >
2162+ CouchKVStore::processVbstateSnapshot (uint16_t vb,
2163+ vbucket_state_t state,
2164+ int64_t version,
2165+ const std::string& snapStart,
2166+ const std::string& snapEnd,
2167+ uint64_t highSeqno) {
2168+ ReadVBStateStatus status = ReadVBStateStatus::Success;
2169+
2170+ // All upgrade paths we now expect start and end
2171+ uint64_t start = 0 , end = 0 ;
2172+ if (!parseUint64 (snapStart.c_str (), &start) ||
2173+ !parseUint64 (snapEnd.c_str (), &end)) {
2174+ start = end = uint64_t (highSeqno);
2175+ } else if (!(highSeqno >= start && highSeqno <= end)) {
2176+ // very likely MB-34173, log this occurrence.
2177+ // log the state, range and version
2178+ logger.log (EXTENSION_LOG_WARNING,
2179+ " CouchKVStore::processVbstateSnapshot vb:%" PRIu16
2180+ " %s with invalid snapshot range. Found version:%" PRId64
2181+ " , "
2182+ " highSeqno:%" PRIu64 " , start:%" PRIu64 " , end:%" PRIu64,
2183+ vb,
2184+ VBucket::toString (state),
2185+ version,
2186+ highSeqno,
2187+ start,
2188+ end);
2189+
2190+ if (state == vbucket_state_active) {
2191+ // Reset the snapshot range to match what the flusher would
2192+ // normally set, that is start and end equal the high-seqno
2193+ start = end = highSeqno;
2194+ } else {
2195+ // Flag that the VB is corrupt, it needs rebuilding
2196+ status = ReadVBStateStatus::CorruptSnapshot;
2197+ start = 0 , end = 0 ;
2198+ }
2199+ }
2200+
2201+ return {status, snapshot_info_t {highSeqno, {start, end}}};
2202+ }
2203+
2204+ CouchKVStore::ReadVBStateStatus CouchKVStore::readVBState (Db* db,
2205+ uint16_t vbId) {
21612206 sized_buf id;
21622207 LocalDoc *ldoc = NULL ;
2163- couchstore_error_t errCode = COUCHSTORE_SUCCESS ;
2208+ ReadVBStateStatus status = ReadVBStateStatus::Success ;
21642209 vbucket_state_t state = vbucket_state_dead;
21652210 uint64_t checkpointId = 0 ;
21662211 uint64_t maxDeletedSeqno = 0 ;
21672212 int64_t highSeqno = 0 ;
21682213 std::string failovers;
21692214 uint64_t purgeSeqno = 0 ;
2170- uint64_t lastSnapStart = 0 ;
2171- uint64_t lastSnapEnd = 0 ;
2215+ snapshot_info_t snapshot{0 , {0 , 0 }};
21722216 uint64_t maxCas = 0 ;
21732217 int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
21742218 bool mightContainXattrs = false ;
21752219
21762220 DbInfo info;
2177- errCode = couchstore_db_info (db, &info);
2178- if (errCode == COUCHSTORE_SUCCESS) {
2179- highSeqno = info.last_sequence ;
2180- purgeSeqno = info.purge_seq ;
2181- } else {
2221+ auto couchStoreStatus = couchstore_db_info (db, &info);
2222+ if (couchStoreStatus != COUCHSTORE_SUCCESS) {
21822223 logger.log (EXTENSION_LOG_WARNING,
21832224 " CouchKVStore::readVBState: couchstore_db_info error:%s"
2184- " , vb:%" PRIu16, couchstore_strerror (errCode), vbId);
2185- return couchErr2EngineErr (errCode);
2225+ " , vb:%" PRIu16,
2226+ couchstore_strerror (couchStoreStatus),
2227+ vbId);
2228+ return ReadVBStateStatus::CouchstoreError;
21862229 }
21872230
2231+ highSeqno = info.last_sequence ;
2232+ purgeSeqno = info.purge_seq ;
2233+
21882234 id.buf = (char *)" _local/vbstate" ;
21892235 id.size = sizeof (" _local/vbstate" ) - 1 ;
2190- errCode = couchstore_open_local_document (db, (void *)id.buf ,
2191- id.size , &ldoc);
2192- if (errCode != COUCHSTORE_SUCCESS) {
2193- if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2194- logger.log (EXTENSION_LOG_NOTICE,
2195- " CouchKVStore::readVBState: '_local/vbstate' not found "
2196- " for vb:%d" , vbId);
2197- } else {
2198- logger.log (EXTENSION_LOG_WARNING,
2199- " CouchKVStore::readVBState: couchstore_open_local_document"
2200- " error:%s, vb:%" PRIu16, couchstore_strerror (errCode),
2201- vbId);
2202- }
2203- } else {
2236+ couchStoreStatus =
2237+ couchstore_open_local_document (db, (void *)id.buf , id.size , &ldoc);
2238+ if (couchStoreStatus == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2239+ logger.log (EXTENSION_LOG_NOTICE,
2240+ " CouchKVStore::readVBState: '_local/vbstate' not found "
2241+ " for vb:%d" ,
2242+ vbId);
2243+ } else if (couchStoreStatus != COUCHSTORE_SUCCESS) {
2244+ logger.log (EXTENSION_LOG_WARNING,
2245+ " CouchKVStore::readVBState: couchstore_open_local_document"
2246+ " error:%s, vb:%" PRIu16,
2247+ couchstore_strerror (couchStoreStatus),
2248+ vbId);
2249+ return ReadVBStateStatus::CouchstoreError;
2250+ }
2251+
2252+ if (couchStoreStatus == COUCHSTORE_SUCCESS) {
22042253 const std::string statjson (ldoc->json .buf , ldoc->json .size );
2205- cJSON * jsonObj = cJSON_Parse (statjson.c_str ());
2254+ cJSON* jsonObj = cJSON_Parse (statjson.c_str ());
22062255 if (!jsonObj) {
22072256 couchstore_free_local_document (ldoc);
2208- logger.log (EXTENSION_LOG_WARNING, " CouchKVStore::readVBState: Failed to "
2257+ logger.log (EXTENSION_LOG_WARNING,
2258+ " CouchKVStore::readVBState: Failed to "
22092259 " parse the vbstat json doc for vb:%" PRIu16 " , json:%s" ,
2210- vbId , statjson.c_str ());
2211- return couchErr2EngineErr (errCode);
2260+ vbId,
2261+ statjson.c_str ());
2262+ return ReadVBStateStatus::JsonInvalid;
22122263 }
22132264
2214- const std::string vb_state = getJSONObjString (
2215- cJSON_GetObjectItem (jsonObj, " state" ));
2216- const std::string checkpoint_id = getJSONObjString (
2217- cJSON_GetObjectItem (jsonObj," checkpoint_id" ));
2265+ const auto version =
2266+ getJSONObjInt64 (cJSON_GetObjectItem (jsonObj, " version" ));
2267+ const std::string vb_state =
2268+ getJSONObjString (cJSON_GetObjectItem (jsonObj, " state" ));
2269+ const std::string checkpoint_id =
2270+ getJSONObjString (cJSON_GetObjectItem (jsonObj, " checkpoint_id" ));
22182271 const std::string max_deleted_seqno = getJSONObjString (
2219- cJSON_GetObjectItem (jsonObj, " max_deleted_seqno" ));
2220- const std::string snapStart = getJSONObjString (
2221- cJSON_GetObjectItem (jsonObj, " snap_start" ));
2222- const std::string snapEnd = getJSONObjString (
2223- cJSON_GetObjectItem (jsonObj, " snap_end" ));
2224- const std::string maxCasValue = getJSONObjString (
2225- cJSON_GetObjectItem (jsonObj, " max_cas" ));
2272+ cJSON_GetObjectItem (jsonObj, " max_deleted_seqno" ));
2273+ const std::string snapStart =
2274+ getJSONObjString ( cJSON_GetObjectItem (jsonObj, " snap_start" ));
2275+ const std::string snapEnd =
2276+ getJSONObjString ( cJSON_GetObjectItem (jsonObj, " snap_end" ));
2277+ const std::string maxCasValue =
2278+ getJSONObjString ( cJSON_GetObjectItem (jsonObj, " max_cas" ));
22262279 const std::string hlcCasEpoch =
22272280 getJSONObjString (cJSON_GetObjectItem (jsonObj, " hlc_epoch" ));
22282281 mightContainXattrs = getJSONObjBool (
22292282 cJSON_GetObjectItem (jsonObj, " might_contain_xattrs" ));
22302283
2231- cJSON *failover_json = cJSON_GetObjectItem (jsonObj, " failover_table" );
2232- if (vb_state.compare (" " ) == 0 || checkpoint_id.compare (" " ) == 0
2233- || max_deleted_seqno.compare (" " ) == 0 ) {
2234- logger.log (EXTENSION_LOG_WARNING, " CouchKVStore::readVBState: State"
2235- " JSON doc for vb:%" PRIu16 " is in the wrong format:%s, "
2284+ cJSON* failover_json = cJSON_GetObjectItem (jsonObj, " failover_table" );
2285+ if (vb_state.compare (" " ) == 0 || checkpoint_id.compare (" " ) == 0 ||
2286+ max_deleted_seqno.compare (" " ) == 0 ) {
2287+ logger.log (EXTENSION_LOG_WARNING,
2288+ " CouchKVStore::readVBState: State"
2289+ " JSON doc for vb:%" PRIu16
2290+ " is in the wrong format:%s, "
22362291 " vb state:%s, checkpoint id:%s and max deleted seqno:%s" ,
2237- vbId, statjson.c_str (), vb_state.c_str (),
2238- checkpoint_id.c_str (), max_deleted_seqno.c_str ());
2292+ vbId,
2293+ statjson.c_str (),
2294+ vb_state.c_str (),
2295+ checkpoint_id.c_str (),
2296+ max_deleted_seqno.c_str ());
22392297 } else {
22402298 state = VBucket::fromString (vb_state.c_str ());
22412299 parseUint64 (max_deleted_seqno.c_str (), &maxDeletedSeqno);
22422300 parseUint64 (checkpoint_id.c_str (), &checkpointId);
22432301
2244- if (snapStart.compare (" " ) == 0 ) {
2245- lastSnapStart = highSeqno;
2246- } else {
2247- parseUint64 (snapStart.c_str (), &lastSnapStart);
2248- }
2249-
2250- if (snapEnd.compare (" " ) == 0 ) {
2251- lastSnapEnd = highSeqno;
2252- } else {
2253- parseUint64 (snapEnd.c_str (), &lastSnapEnd);
2254- }
2255-
22562302 if (maxCasValue.compare (" " ) != 0 ) {
22572303 parseUint64 (maxCasValue.c_str (), &maxCas);
22582304
2259- // MB-17517: If the maxCas on disk was invalid then don't use it -
2260- // instead rebuild from the items we load from disk (i.e. as per
2261- // an upgrade from an earlier version).
2305+ // MB-17517: If the maxCas on disk was invalid then don't use it
2306+ // - instead rebuild from the items we load from disk (i.e. as
2307+ // per an upgrade from an earlier version).
22622308 if (maxCas == static_cast <uint64_t >(-1 )) {
22632309 logger.log (EXTENSION_LOG_WARNING,
22642310 " CouchKVStore::readVBState: Invalid max_cas "
22652311 " (0x%" PRIx64 " ) read from '%s' for vb:%" PRIu16
22662312 " . Resetting max_cas to zero." ,
2267- maxCas, id.buf , vbId);
2313+ maxCas,
2314+ id.buf ,
2315+ vbId);
22682316 maxCas = 0 ;
22692317 }
22702318 }
@@ -2276,24 +2324,36 @@ ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
22762324 if (failover_json) {
22772325 failovers = to_string (failover_json, false );
22782326 }
2327+
2328+ std::tie (status, snapshot) =
2329+ processVbstateSnapshot (vbId,
2330+ state,
2331+ version,
2332+ snapStart,
2333+ snapEnd,
2334+ uint64_t (highSeqno));
22792335 }
2336+
22802337 cJSON_Delete (jsonObj);
22812338 couchstore_free_local_document (ldoc);
22822339 }
22832340
2284- cachedVBStates[vbId] = std::make_unique<vbucket_state>(state,
2285- checkpointId,
2286- maxDeletedSeqno,
2287- highSeqno,
2288- purgeSeqno,
2289- lastSnapStart,
2290- lastSnapEnd,
2291- maxCas,
2292- hlcCasEpochSeqno,
2293- mightContainXattrs,
2294- failovers);
2295-
2296- return couchErr2EngineErr (errCode);
2341+ // If snapshot is not initialised do not create a vbucket state
2342+ if (status == ReadVBStateStatus::Success) {
2343+ cachedVBStates[vbId] =
2344+ std::make_unique<vbucket_state>(state,
2345+ checkpointId,
2346+ maxDeletedSeqno,
2347+ snapshot.start ,
2348+ purgeSeqno,
2349+ snapshot.range .start ,
2350+ snapshot.range .end ,
2351+ maxCas,
2352+ hlcCasEpochSeqno,
2353+ mightContainXattrs,
2354+ failovers);
2355+ }
2356+ return status;
22972357}
22982358
22992359couchstore_error_t CouchKVStore::saveVBState (Db *db,
@@ -2309,7 +2369,8 @@ couchstore_error_t CouchKVStore::saveVBState(Db *db,
23092369 jsonState << " ,\" snap_start\" : \" " << vbState.lastSnapStart << " \" "
23102370 << " ,\" snap_end\" : \" " << vbState.lastSnapEnd << " \" "
23112371 << " ,\" max_cas\" : \" " << vbState.maxCas << " \" "
2312- << " ,\" hlc_epoch\" : \" " << vbState.hlcCasEpochSeqno << " \" " ;
2372+ << " ,\" hlc_epoch\" : \" " << vbState.hlcCasEpochSeqno << " \" "
2373+ << " ,\" version\" : 2" ;
23132374
23142375 if (vbState.mightContainXattrs ) {
23152376 jsonState << " ,\" might_contain_xattrs\" : true" ;
@@ -2718,7 +2779,7 @@ RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
27182779 return RollbackResult (false , 0 , 0 , 0 );
27192780 }
27202781
2721- if (readVBState (newdb, vbid) != ENGINE_SUCCESS ) {
2782+ if (readVBState (newdb, vbid) != ReadVBStateStatus::Success ) {
27222783 return RollbackResult (false , 0 , 0 , 0 );
27232784 }
27242785 cachedDeleteCount[vbid] = info.deleted_count ;
0 commit comments