Skip to content

Commit 41a54a2

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-41942: Persist the VB state earlier during warmup
Currently we attempt to persist the vBucket state(s) when we complete warmup. This leads to a window during warmup in which we can write new items (via expirations) after identifying that we need to generate a new FailoverTable entry but before we persist it. If we then crash or shut down in this period then we can fail to roll back any replica nodes. This can in turn cause data loss/data corruption. Correct this by persisting the vBucket state during warmup as soon as we know that we need to generate a new FailoverTable entry. Change-Id: Iea5eb60b29ae4ba1d66e83d5b5e4f2b44d516c26 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/142080 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 9c481aa commit 41a54a2

File tree

4 files changed

+123
-5
lines changed

4 files changed

+123
-5
lines changed

engines/ep/src/ep_bucket.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1869,10 +1869,6 @@ bool EPBucket::maybeEnableTraffic() {
18691869
}
18701870

18711871
void EPBucket::warmupCompleted() {
1872-
// Snapshot VBucket state after warmup to ensure Failover table is
1873-
// persisted.
1874-
scheduleVBStatePersist();
1875-
18761872
if (engine.getConfiguration().getAlogPath().length() > 0) {
18771873
if (engine.getConfiguration().isAccessScannerEnabled()) {
18781874
{

engines/ep/src/warmup.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,12 @@ void Warmup::createVBuckets(uint16_t shardId) {
10631063
} else {
10641064
vb->failovers->createEntry(vbs.lastSnapStart);
10651065
}
1066+
1067+
// MB-41942: Persist the latest VB state to ensure that the
1068+
// FailoverTable is correct if we write any new seqnos during
1069+
// warmup (e.g. due to expiration)
1070+
vb->checkpointManager->queueSetVBState(*vb);
1071+
10661072
auto entry = vb->failovers->getLatestEntry();
10671073
EP_LOG_INFO(
10681074
"Warmup::createVBuckets: {} created new failover entry "

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include "tests/test_fileops.h"
5353
#include "vbucket_state.h"
5454
#include "vbucket_bgfetch_item.h"
55+
#include "warmup.h"
5556

5657
#include "../couchstore/src/internal.h"
5758

@@ -4903,6 +4904,119 @@ void STParamPersistentBucketTest::testAbortDoesNotIncrementOpsDelete(
49034904
EXPECT_EQ(0, vb.opsDelete);
49044905
}
49054906

4907+
4908+
void STParamPersistentBucketTest::testFailoverTableEntryPersistedAtWarmup(std::function<void()> testFunction) {
4909+
// 1) Store something so we can expire it later
4910+
engine->getKVBucket()->setVBucketState(vbid, vbucket_state_active);
4911+
auto vb = engine->getKVBucket()->getVBucket(vbid);
4912+
4913+
// Grab initialUuid for testing
4914+
auto initialUuid = vb->failovers->getLatestUUID();
4915+
4916+
auto key = makeStoredDocKey("key");
4917+
store_item(vbid, key, "value", 1 /*expiryTime*/);
4918+
flushVBucketToDiskIfPersistent(vbid, 1);
4919+
EXPECT_EQ(1, vb->getHighSeqno());
4920+
4921+
// 2) Restart as though we had an unclean shutdown (creating a new failover
4922+
// table entry) and run the warmup up to the point of completion.
4923+
vb.reset();
4924+
resetEngineAndEnableWarmup();
4925+
4926+
auto& readerQueue = *task_executor->getLpTaskQ()[READER_TASK_IDX];
4927+
auto* warmup = engine->getKVBucket()->getWarmup();
4928+
ASSERT_TRUE(warmup);
4929+
4930+
// Warmup - load everything but don't run the complete phase which schedules
4931+
// persistence of the vBucket state (new failover entry)
4932+
while (warmup->getWarmupState() != WarmupState::State::Done) {
4933+
runNextTask(readerQueue);
4934+
}
4935+
4936+
// 3) Test
4937+
testFunction();
4938+
4939+
// New high seqno
4940+
vb = engine->getKVBucket()->getVBucket(vbid);
4941+
EXPECT_EQ(2, vb->getHighSeqno());
4942+
4943+
// Flush the expiry
4944+
flushVBucketToDiskIfPersistent(vbid, 1);
4945+
4946+
// Verify that the item has been expired
4947+
auto options = static_cast<get_options_t>(
4948+
QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
4949+
HIDE_LOCKED_CAS | TRACK_STATISTICS);
4950+
auto gv = store->get(key, vbid, cookie, options);
4951+
4952+
if (gv.getStatus() == ENGINE_EWOULDBLOCK) {
4953+
runBGFetcherTask();
4954+
gv = store->get(key, vbid, cookie, options);
4955+
}
4956+
4957+
EXPECT_EQ(ENGINE_KEY_ENOENT, gv.getStatus());
4958+
4959+
// Get our new uuid now
4960+
auto secondUuid = vb->failovers->getLatestUUID();
4961+
ASSERT_NE(initialUuid, secondUuid);
4962+
4963+
// "Complete" the warmup or the test will get stuck shutting down, we won't
4964+
// actually flush the new vb state though so we're still testing as though
4965+
// this didn't happen
4966+
runNextTask(readerQueue);
4967+
4968+
// 4) Restart again
4969+
vb.reset();
4970+
resetEngineAndWarmup();
4971+
4972+
// 5) The test - uuid should have both of the previous entries
4973+
vb = engine->getKVBucket()->getVBucket(vbid);
4974+
4975+
auto failovers = vb->failovers->getFailoverLog();
4976+
auto itr = std::find_if(failovers.begin(), failovers.end(), [&initialUuid](const auto& failoverEntry) {
4977+
return failoverEntry.uuid == initialUuid;
4978+
});
4979+
EXPECT_NE(itr, failovers.end());
4980+
4981+
itr = std::find_if(failovers.begin(), failovers.end(), [&secondUuid](const auto& failoverEntry) {
4982+
return failoverEntry.uuid == secondUuid;
4983+
});
4984+
EXPECT_NE(itr, failovers.end());
4985+
4986+
EXPECT_EQ(2, vb->getHighSeqno());
4987+
gv = store->get(key, vbid, cookie, options);
4988+
4989+
if (gv.getStatus() == ENGINE_EWOULDBLOCK) {
4990+
runBGFetcherTask();
4991+
gv = store->get(key, vbid, cookie, options);
4992+
}
4993+
}
4994+
4995+
TEST_P(STParamPersistentBucketTest, TestExpiryDueToCompactionPersistsFailoverTableEntryDuringWarmup) {
4996+
testFailoverTableEntryPersistedAtWarmup([this]() {
4997+
CompactionConfig config;
4998+
engine->compactDB(vbid, config, cookie);
4999+
std::string taskDescription =
5000+
"Compact DB file " + std::to_string(vbid.get());
5001+
runNextTask(*task_executor->getLpTaskQ()[WRITER_TASK_IDX], taskDescription);
5002+
});
5003+
}
5004+
5005+
TEST_P(STParamPersistentBucketTest, TestExpiryDueToGetPersistsFailoverTableEntryDuringWarmup) {
5006+
testFailoverTableEntryPersistedAtWarmup([this]() {
5007+
auto options = static_cast<get_options_t>(
5008+
QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
5009+
HIDE_LOCKED_CAS | TRACK_STATISTICS);
5010+
auto key = makeStoredDocKey("key");
5011+
auto gv = store->get(key, vbid, cookie, options);
5012+
5013+
if (gv.getStatus() == ENGINE_EWOULDBLOCK) {
5014+
runBGFetcherTask();
5015+
gv = store->get(key, vbid, cookie, options);
5016+
}
5017+
});
5018+
}
5019+
49065020
TEST_P(STParamPersistentBucketTest, AbortDoesNotIncrementOpsDelete) {
49075021
testAbortDoesNotIncrementOpsDelete(true /*flusherDedup*/);
49085022
}
@@ -4992,4 +5106,4 @@ TEST_P(STParamPersistentBucketTest, ExpiryFindsPrepareWithSameCas) {
49925106
INSTANTIATE_TEST_CASE_P(Persistent,
49935107
STParamPersistentBucketTest,
49945108
STParameterizedBucketTest::persistentConfigValues(),
4995-
STParameterizedBucketTest::PrintToStringParamName);
5109+
STParameterizedBucketTest::PrintToStringParamName);

engines/ep/tests/module_tests/evp_store_single_threaded_test.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,4 +338,6 @@ class STParameterizedBucketTest
338338
class STParamPersistentBucketTest : public STParameterizedBucketTest {
339339
protected:
340340
void testAbortDoesNotIncrementOpsDelete(bool flusherDedup);
341+
342+
void testFailoverTableEntryPersistedAtWarmup(std::function<void()>);
341343
};

0 commit comments

Comments
 (0)