Skip to content

Commit 9664b23

Browse files
paolococchidaverigby
authored andcommitted
MB-51408: Don't miss closing the open checkpoint at memory recovery
There are a couple of DEV_ASSERTS in the CheckpointMemRecoveryTask that wrap an important step of the checkpoint memory recovery logic: closing the open checkpoint and creating a new one, if possible. Recently we set DEV_ASSERTS=OFF for Neo, which caused the task missing that step. Consequence is that we might enter unrecoverable TempOOM states, eg: 1. Store mutation A in vb1/checkpoint1 - no OOM 2. Store mutation B in vb2/checkpoint1 - OOM 3. CheckpointMemRecoveryTask runs and doesn't close the open checkpoints 4. The same task can't expel anything as we never expel high-seqno 5. Unrecoverable OOM This patch fixes by reverting step (3) back into the code. Covered by unit tests for both persistent/ephemeral. Change-Id: Id51b837bd9500a26c89ae7999a8a4a60cd1930b5 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/172215 Tested-by: Build Bot <[email protected]> Reviewed-by: James H <[email protected]> Well-Formed: Restriction Checker
1 parent 7f23966 commit 9664b23

File tree

6 files changed

+88
-73
lines changed

6 files changed

+88
-73
lines changed

engines/ep/src/checkpoint_config.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ class CheckpointConfig {
5555
return checkpointRemovalMode;
5656
}
5757

58+
// @todo: Test only. Remove as soon as param made dynamic in EPConfig.
59+
void setCheckpointRemovalMode(CheckpointRemoval mode) {
60+
checkpointRemovalMode = mode;
61+
}
62+
5863
protected:
5964
friend class EventuallyPersistentEngine;
6065
friend class SynchronousEPEngine;

engines/ep/src/checkpoint_remover.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ CheckpointMemRecoveryTask::attemptCursorDropping() {
183183
case CheckpointRemoval::Eager: {
184184
#if CB_DEVELOPMENT_ASSERTS
185185
Expects(manager.removeClosedUnrefCheckpoints().count == 0);
186+
#else
187+
// MB-51408: We need to make the call for keeping executing the
188+
// inner checkpoint creation logic - minimal fix for Neo.
189+
manager.removeClosedUnrefCheckpoints();
186190
#endif
187191
break;
188192
}
@@ -244,6 +248,10 @@ bool CheckpointMemRecoveryTask::runInner() {
244248
// This is not cheap to verify, as it requires scanning every
245249
// vbucket, so is only checked if dev asserts are on.
246250
Expects(attemptCheckpointRemoval().second == 0);
251+
#else
252+
// MB-51408: We need to make the call for keeping executing the inner
253+
// checkpoint creation logic - minimal fix for Neo.
254+
attemptCheckpointRemoval();
247255
#endif
248256
break;
249257
}

engines/ep/tests/ep_testsuite.cc

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,9 +1801,6 @@ static enum test_result test_vbucket_destroy_stats(EngineIface* h) {
18011801
}
18021802
wait_for_flusher_to_settle(h);
18031803
testHarness->time_travel(65);
1804-
// store one more item to close the previous checkpoint, allowing it
1805-
// to be removed
1806-
wait_for_persisted_value(h, "padding-key", "some value", Vbid(1));
18071804
wait_for_stat_change(h, "ep_items_rm_from_checkpoints", itemsRemoved);
18081805

18091806
check(set_vbucket_state(h, Vbid(1), vbucket_state_dead),
@@ -2128,9 +2125,6 @@ static enum test_result test_mem_stats(EngineIface* h) {
21282125
int itemsRemoved = get_int_stat(h, "ep_items_rm_from_checkpoints");
21292126
wait_for_persisted_value(h, "key", value.c_str());
21302127
testHarness->time_travel(65);
2131-
// store a second item after advancing time. This will trigger a checkpoint
2132-
// close, and will allow the previous checkpoint to be removed.
2133-
wait_for_persisted_value(h, "key2", value.c_str());
21342128
if (isPersistentBucket(h)) {
21352129
wait_for_stat_change(h, "ep_items_rm_from_checkpoints", itemsRemoved);
21362130
}
@@ -4260,15 +4254,12 @@ static enum test_result test_disk_gt_ram_golden(EngineIface* h) {
42604254
// Store some data and check post-set state.
42614255
wait_for_persisted_value(h, "k1", "some value");
42624256
testHarness->time_travel(65);
4263-
// store one more item to close the previous checkpoint, allowing it
4264-
// to be removed
4265-
wait_for_persisted_value(h, "padding-key", "some value");
42664257
wait_for_stat_change(h, "ep_items_rm_from_checkpoints", itemsRemoved);
42674258

42684259
checkeq(0,
42694260
get_int_stat(h, "ep_bg_fetched"),
42704261
"Should start with zero bg fetches");
4271-
checkeq((initial_enqueued + 2),
4262+
checkeq((initial_enqueued + 1),
42724263
get_int_stat(h, "ep_total_enqueued"),
42734264
"Should have additional item enqueued after store");
42744265
int kv_size = get_int_stat(h, "ep_kv_size");
@@ -4288,7 +4279,7 @@ static enum test_result test_disk_gt_ram_golden(EngineIface* h) {
42884279
checkeq(1,
42894280
get_int_stat(h, "ep_bg_fetched"),
42904281
"BG fetches should be one after reading an evicted key");
4291-
checkeq((initial_enqueued + 2),
4282+
checkeq((initial_enqueued + 1),
42924283
get_int_stat(h, "ep_total_enqueued"),
42934284
"Item should not be marked dirty after reading an evicted key");
42944285

@@ -4303,9 +4294,6 @@ static enum test_result test_disk_gt_ram_golden(EngineIface* h) {
43034294
"Failed remove with value.");
43044295
wait_for_stat_change(h, "ep_total_persisted", numStored);
43054296
testHarness->time_travel(65);
4306-
// store one more item to close the previous checkpoint, allowing it
4307-
// to be removed
4308-
wait_for_persisted_value(h, "padding-key", "some value");
43094297
wait_for_stat_change(h, "ep_items_rm_from_checkpoints", itemsRemoved);
43104298

43114299
return SUCCESS;
@@ -4339,9 +4327,6 @@ static enum test_result test_disk_gt_ram_paged_rm(EngineIface* h) {
43394327
"Failed remove with value.");
43404328
wait_for_stat_change(h, "ep_total_persisted", numStored);
43414329
testHarness->time_travel(65);
4342-
// store one more item to close the previous checkpoint, allowing it
4343-
// to be removed
4344-
wait_for_persisted_value(h, "padding-key", "some value");
43454330
wait_for_stat_change(h, "ep_items_rm_from_checkpoints", itemsRemoved);
43464331

43474332
return SUCCESS;

engines/ep/tests/ep_testsuite_checkpoint.cc

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,6 @@ static enum test_result test_checkpoint_timeout(EngineIface* h) {
109109
store(h, nullptr, StoreSemantics::Set, "key", "value"),
110110
"Failed to store an item.");
111111
testHarness->time_travel(600);
112-
// store another item after advancing time, should close the previous
113-
// checkpoint
114-
checkeq(cb::engine_errc::success,
115-
store(h, nullptr, StoreSemantics::Set, "key2", "value"),
116-
"Failed to store second item.");
117112
wait_for_stat_to_be(h, "vb_0:open_checkpoint_id", 2, "checkpoint");
118113
return SUCCESS;
119114
}

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2002,18 +2002,18 @@ static enum test_result test_dcp_producer_stream_req_partial(EngineIface* h) {
20022002
get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint");
20032003
checkeq(1, initial_ckpt_id, "Expected to start at checkpoint ID 1");
20042004

2005-
// Create two 'full' checkpoints by storing 2 x 'chk_max_items' + 1
2005+
// Create two 'full' checkpoints by storing exactly 2 x 'chk_max_items'
20062006
// into the VBucket.
20072007
const auto max_ckpt_items = get_int_stat(h, "ep_chk_max_items");
20082008

2009-
write_items(h, max_ckpt_items + 1);
2009+
write_items(h, max_ckpt_items);
20102010
wait_for_flusher_to_settle(h);
20112011
wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items);
20122012
checkeq(initial_ckpt_id + 1,
20132013
get_int_stat(h, "vb_0:open_checkpoint_id", "checkpoint"),
20142014
"Expected #checkpoints to increase by 1 after storing items");
20152015

2016-
write_items(h, max_ckpt_items + 1, max_ckpt_items);
2016+
write_items(h, max_ckpt_items, max_ckpt_items);
20172017
wait_for_flusher_to_settle(h);
20182018
wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", max_ckpt_items * 2);
20192019
checkeq(initial_ckpt_id + 2,
@@ -2054,11 +2054,11 @@ static enum test_result test_dcp_producer_stream_req_partial(EngineIface* h) {
20542054
ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
20552055
ctx.seqno = {105, 209};
20562056
ctx.snapshot = {105, 105};
2057-
ctx.exp_mutations = 97; // 105 to 200
2057+
ctx.exp_mutations = 95; // 105 to 200
20582058
ctx.exp_deletions = 100; // 201 to 300
20592059

20602060
if (isPersistentBucket(h)) {
2061-
ctx.exp_markers = 3;
2061+
ctx.exp_markers = 2;
20622062
} else {
20632063
// the ephemeral stream request won't be broken into two snapshots of
20642064
// backfill from disk vs the checkpoint in memory
@@ -2257,41 +2257,29 @@ static test_result testDcpProducerExpiredItemBackfill(
22572257
}
22582258

22592259
static enum test_result test_dcp_producer_stream_req_backfill(EngineIface* h) {
2260-
const auto max_ckpt_items = get_int_stat(h, "ep_chk_max_items");
2261-
2262-
const int batch_items = max_ckpt_items;
2263-
const int num_items = batch_items * 2;
2260+
const int num_items = 400, batch_items = 200;
22642261
for (int start_seqno = 0; start_seqno < num_items;
22652262
start_seqno += batch_items) {
2266-
if (start_seqno == batch_items) {
2267-
// stop persistence after the first batch is on disk
2263+
if (200 == start_seqno) {
22682264
wait_for_flusher_to_settle(h);
2269-
wait_for_stat_to_be(h,
2270-
"vb_0:persistence_seqno",
2271-
batch_items,
2272-
"vbucket-details");
2265+
wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", 200);
22732266
stop_persistence(h);
22742267
}
22752268
write_items(h, batch_items, start_seqno);
22762269
}
22772270

2278-
// Wait for the first checkpoint to be removed.
2279-
// The second checkpoint is the open checkpoint (and for a persistent
2280-
// bucket, contains unpersisted items) and won't be removed
2281-
wait_for_stat_to_be_gte(h, "ep_items_rm_from_checkpoints", batch_items);
2282-
22832271
auto* cookie = testHarness->create_cookie(h);
22842272

22852273
DcpStreamCtx ctx;
22862274
ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
2287-
ctx.seqno = {0, uint64_t(batch_items)};
2275+
ctx.seqno = {0, 200};
22882276
// The idea here is that at backfill we get the full Disk/SeqList snapshot.
22892277
// Persistence has been stopped at seqno 200, while Ephemeral stores all
22902278
// seqnos in the SeqList.
22912279
if (isEphemeralBucket(h)) {
2292-
ctx.exp_mutations = num_items;
2280+
ctx.exp_mutations = 400;
22932281
} else {
2294-
ctx.exp_mutations = batch_items;
2282+
ctx.exp_mutations = 200;
22952283
}
22962284
ctx.exp_markers = 1;
22972285

@@ -2360,11 +2348,6 @@ static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
23602348
wait_for_flusher_to_settle(h);
23612349
verify_curr_items(h, num_items, "Wrong amount of items");
23622350

2363-
testHarness->time_travel(65);
2364-
// store one more item after advancing time to close the previous
2365-
// checkpoint, allowing it to be removed
2366-
wait_for_persisted_value(h, "padding-key", "some value");
2367-
23682351
/* Wait for the checkpoint to be removed so that upon DCP connection
23692352
backfill is scheduled */
23702353
wait_for_stat_to_be(h, "ep_items_rm_from_checkpoints", num_items);
@@ -2374,7 +2357,7 @@ static enum test_result test_dcp_producer_disk_backfill_buffer_limits(
23742357
DcpStreamCtx ctx;
23752358
ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
23762359
ctx.seqno = {0, num_items};
2377-
ctx.exp_mutations = num_items + 1;
2360+
ctx.exp_mutations = 3;
23782361
ctx.exp_markers = 1;
23792362

23802363
TestDcpConsumer tdc("unittest", cookie, h);
@@ -2908,12 +2891,7 @@ static enum test_result test_dcp_producer_stream_cursor_movement(
29082891
cdc.dcpConsumer->producers.last_byseqno,
29092892
exp_items);
29102893

2911-
testHarness->time_travel(65);
2912-
// store one more item to close the previous checkpoint, allowing it
2913-
// to be removed
2914-
wait_for_persisted_value(h, "padding-key", "some value");
2915-
2916-
/* Wait for new open checkpoint to be added */
2894+
/* Wait for new open (empty) checkpoint to be added */
29172895
wait_for_stat_to_be(
29182896
h, "vb_0:open_checkpoint_id", curr_chkpt_id + 1, "checkpoint");
29192897

@@ -3774,11 +3752,6 @@ static enum test_result test_failover_scenario_two_with_dcp(EngineIface* h) {
37743752
// Front-end operations (sets)
37753753
write_items(h, 2, 1, "key_");
37763754

3777-
testHarness->time_travel(65);
3778-
// store one more item to close the previous checkpoint, allowing it
3779-
// to be removed
3780-
wait_for_persisted_value(h, "padding-key", "some value");
3781-
37823755
// Wait for a new open checkpoint
37833756
wait_for_stat_to_be(
37843757
h, "vb_0:open_checkpoint_id", openCheckpointId + 1, "checkpoint");
@@ -6156,19 +6129,14 @@ static enum test_result test_dcp_last_items_purged(EngineIface* h) {
61566129
get_int_stat(h, "vb_0:purge_seqno", "vbucket-seqno"),
61576130
"purge_seqno didn't match expected value");
61586131

6159-
testHarness->time_travel(65);
6160-
// store one more item to close the previous checkpoint, allowing it
6161-
// to be removed
6162-
wait_for_persisted_value(h, "padding-key", "some value");
6163-
61646132
wait_for_stat_to_be(h, "vb_0:open_checkpoint_id", 2, "checkpoint");
61656133
wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
61666134

61676135
/* Create a DCP stream */
61686136
DcpStreamCtx ctx;
61696137
ctx.vb_uuid = get_ull_stat(h, "vb_0:0:id", "failovers");
61706138
ctx.seqno = {0, get_ull_stat(h, "vb_0:high_seqno", "vbucket-seqno")};
6171-
ctx.exp_mutations = 2;
6139+
ctx.exp_mutations = 1;
61726140
ctx.exp_deletions = 1;
61736141
ctx.exp_markers = 1;
61746142
ctx.skip_estimate_check = true;
@@ -6241,11 +6209,6 @@ static enum test_result test_dcp_rollback_after_purge(EngineIface* h) {
62416209
get_int_stat(h, "vb_0:purge_seqno", "vbucket-seqno"),
62426210
"purge_seqno didn't match expected value");
62436211

6244-
testHarness->time_travel(65);
6245-
// store one more item to close the previous checkpoint, allowing it
6246-
// to be removed
6247-
wait_for_persisted_value(h, "padding-key", "some value");
6248-
62496212
wait_for_stat_to_be(h, "vb_0:open_checkpoint_id", 2, "checkpoint");
62506213
wait_for_stat_to_be(h, "vb_0:num_checkpoints", 1, "checkpoint");
62516214

engines/ep/tests/module_tests/checkpoint_remover_test.cc

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,65 @@ TEST_P(CheckpointRemoverTest, expelWithoutCursor) {
390390
EXPECT_GT(stats.memFreedByCheckpointItemExpel, 0);
391391
}
392392

393+
TEST_P(CheckpointRemoverTest, MemRecoveryByCheckpointCreation) {
394+
setVBucketStateAndRunPersistTask(Vbid(0), vbucket_state_active);
395+
setVBucketStateAndRunPersistTask(Vbid(1), vbucket_state_active);
396+
397+
auto& config = engine->getConfiguration();
398+
config.setChkExpelEnabled(true);
399+
config.setMaxSize(1024 * 1024 * 100);
400+
401+
// @todo: Convert to config.setCheckpointRemovalMode("eager") once the
402+
// param has been made dynamic
403+
auto& ckptConfig = engine->getCheckpointConfig();
404+
ckptConfig.setCheckpointRemovalMode(CheckpointRemoval::Eager);
405+
ASSERT_TRUE(engine->getCheckpointConfig().isEagerCheckpointRemoval());
406+
407+
ASSERT_EQ(0, store->getRequiredCheckpointMemoryReduction());
408+
409+
// Compute paylaod size such that we enter a TempOOM phase when we store
410+
// the second item.
411+
const size_t valueSize =
412+
config.getMaxSize() * config.getCheckpointMemoryRatio() *
413+
config.getCheckpointMemoryRecoveryUpperMark() / 2 +
414+
1;
415+
const auto value = std::string(valueSize, 'x');
416+
// Store first item, no checkpoint OOM yet
417+
store_item(Vbid(0), makeStoredDocKey("keyA"), value);
418+
EXPECT_EQ(0, store->getRequiredCheckpointMemoryReduction());
419+
// Store second item, Checkpoint OOM
420+
store_item(Vbid(1), makeStoredDocKey("keyB"), value);
421+
ASSERT_GT(store->getRequiredCheckpointMemoryReduction(), 0);
422+
423+
// Move the cursors to the end of the open checkpoint. Step required to
424+
// allow checkpoint creation + cursor jumping into the new checkpoints in
425+
// the next steps
426+
flushVBucketToDiskIfPersistent(Vbid(0), 1);
427+
flushVBucketToDiskIfPersistent(Vbid(1), 1);
428+
429+
const auto& stats = engine->getEpStats();
430+
ASSERT_EQ(0, stats.itemsExpelledFromCheckpoints);
431+
ASSERT_EQ(0, stats.itemsRemovedFromCheckpoints);
432+
433+
// Mem-recovery is expected to:
434+
// 1. Create a new checkpoint on at least 1 vbucket
435+
// 2. Move the cursors from the closed checkpoint to the open one
436+
// 3. Remove the closed (and now unred) checkpoint
437+
const auto remover = std::make_shared<CheckpointMemRecoveryTask>(
438+
engine.get(),
439+
engine->getEpStats(),
440+
engine->getConfiguration().getChkRemoverStime(),
441+
0);
442+
remover->run();
443+
444+
// That allows to remove checkpoints and recover from OOM
445+
// Before the fix, nothing removed from checkpoints and mem-reduction still
446+
// required at this point
447+
EXPECT_EQ(0, stats.itemsExpelledFromCheckpoints);
448+
EXPECT_GT(stats.itemsRemovedFromCheckpoints, 0);
449+
EXPECT_EQ(0, store->getRequiredCheckpointMemoryReduction());
450+
}
451+
393452
// Test written for MB-36366. With the fix removed this test failed because
394453
// post expel, we continued onto cursor dropping.
395454
// MB-36447 - unreliable test, disabling for now

0 commit comments

Comments
 (0)