Skip to content

Commit 075614a

Browse files
committed
MB-30019: Always close replica-checkpoint on memory-snapshot-end
Currently, when a Consumer receives the shapshot-end mutation of a memory-snapshot we close the current open checkpoint only if (mem_used > high_watermark). As seen in MB-30019, we may end up with keeping big checkpoints in memory when a Consumer has received the latest snapshot. We this patch we unconditionally close a open replica-checkpoint when the Consumer receives the snapshot-end mutation for a memory-sanpshot. Change-Id: I9ed5388dd8f5bbe0b51113ca139049d1825c4790 Reviewed-on: http://review.couchbase.org/95260 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 1aae4b1 commit 075614a

File tree

9 files changed

+173
-33
lines changed

9 files changed

+173
-33
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
280280
tests/module_tests/checkpoint_remover_test.cc
281281
tests/module_tests/checkpoint_test.h
282282
tests/module_tests/checkpoint_test.cc
283+
tests/module_tests/checkpoint_utils.h
283284
tests/module_tests/collections/collection_dockey_test.cc
284285
tests/module_tests/collections/evp_store_collections_dcp_test.cc
285286
tests/module_tests/collections/evp_store_collections_eraser_test.cc

engines/ep/src/dcp/stream.cc

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2715,23 +2715,17 @@ void PassiveStream::processSetVBucketState(SetVBucketState* state) {
27152715
void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
27162716
if (byseqno == cur_snapshot_end.load()) {
27172717
auto& ckptMgr = *vb->checkpointManager;
2718+
27182719
if (cur_snapshot_type.load() == Snapshot::Disk &&
27192720
vb->isBackfillPhase()) {
27202721
vb->setBackfillPhase(false);
2721-
const auto id = ckptMgr.getOpenCheckpointId() + 1;
2722-
ckptMgr.checkAndAddNewCheckpoint(id, *vb);
2723-
} else {
2724-
size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
2725-
size_t mem_used =
2726-
engine->getEpStats().getEstimatedTotalMemoryUsed();
2727-
/* We want to add a new replica checkpoint if the mem usage is above
2728-
high watermark (85%) */
2729-
if (mem_threshold < mem_used) {
2730-
const auto id = ckptMgr.getOpenCheckpointId() + 1;
2731-
ckptMgr.checkAndAddNewCheckpoint(id, *vb);
2732-
}
27332722
}
27342723

2724+
// MB-30019: we always want to close the open checkpoint on replica
2725+
// vbuckets when the Consumer receives the snapshot-end mutation
2726+
ckptMgr.checkAndAddNewCheckpoint(ckptMgr.getOpenCheckpointId() + 1,
2727+
*vb);
2728+
27352729
if (cur_snapshot_ack) {
27362730
{
27372731
LockHolder lh(streamMutex);

engines/ep/src/dcp/stream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ class PassiveStream : public Stream {
742742

743743
bool transitionState(StreamState newState);
744744

745-
ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
745+
virtual ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
746746

747747
ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
748748

@@ -782,7 +782,7 @@ class PassiveStream : public Stream {
782782

783783
void handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno);
784784

785-
void processMarker(SnapshotMarker* marker);
785+
virtual void processMarker(SnapshotMarker* marker);
786786

787787
void processSetVBucketState(SetVBucketState* state);
788788

engines/ep/tests/mock/mock_stream.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,14 @@ class MockPassiveStream : public PassiveStream {
272272
return PassiveStream::messageReceived(std::move(dcpResponse));
273273
}
274274

275+
void processMarker(SnapshotMarker* marker) override {
276+
PassiveStream::processMarker(marker);
277+
}
278+
279+
ENGINE_ERROR_CODE processMutation(MutationResponse* mutation) override {
280+
return PassiveStream::processMutation(mutation);
281+
}
282+
275283
size_t getNumBufferItems() const {
276284
LockHolder lh(buffer.bufMutex);
277285
return buffer.messages.size();

engines/ep/tests/module_tests/checkpoint_remover_test.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
*/
1717

1818
#include "checkpoint_remover_test.h"
19+
20+
#include "checkpoint_utils.h"
21+
1922
#include <engines/ep/src/checkpoint_remover.h>
2023

2124
#include "../mock/mock_dcp.h"
@@ -28,12 +31,6 @@ size_t CheckpointRemoverTest::getMaxCheckpointItems(VBucket& vb) {
2831
return vb.checkpointManager->getCheckpointConfig().getCheckpointMaxItems();
2932
}
3033

31-
const CheckpointList&
32-
CheckpointManagerTestIntrospector::public_getCheckpointList(
33-
CheckpointManager& checkpointManager) {
34-
return checkpointManager.checkpointList;
35-
}
36-
3734
/**
3835
* Check that the VBucketMap.getActiveVBucketsSortedByChkMgrMem() returns the
3936
* correct ordering of vBuckets, sorted from largest memory usage to smallest.
@@ -207,4 +204,4 @@ TEST_F(CheckpointRemoverEPTest, CursorDropMemoryFreed) {
207204

208205
// There should only be the one checkpoint cursor now for persistence
209206
ASSERT_EQ(1, checkpointManager->getNumOfCursors());
210-
}
207+
}

engines/ep/tests/module_tests/checkpoint_remover_test.h

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,3 @@ class CheckpointRemoverEPTest : public CheckpointRemoverTest {
5050
return dynamic_cast<EPBucket&>(*store);
5151
}
5252
};
53-
54-
/**
55-
* Stateless class used to gain privileged access into CheckpointManager for
56-
* testing purposes. This is a friend class of CheckpointManager.
57-
*/
58-
class CheckpointManagerTestIntrospector {
59-
public:
60-
static const CheckpointList& public_getCheckpointList(
61-
CheckpointManager& checkpointManager);
62-
};

engines/ep/tests/module_tests/checkpoint_test.cc

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
* limitations under the License.
1616
*/
1717

18+
#include "checkpoint_test.h"
19+
1820
#include "config.h"
1921

2022
#include <algorithm>
@@ -23,7 +25,7 @@
2325
#include <vector>
2426

2527
#include "checkpoint.h"
26-
#include "checkpoint_test.h"
28+
#include "checkpoint_utils.h"
2729
#include "configuration.h"
2830
#include "ep_vb.h"
2931
#include "failover-table.h"
@@ -32,6 +34,8 @@
3234
#include "tests/module_tests/test_helpers.h"
3335
#include "thread_gate.h"
3436

37+
#include "../mock/mock_dcp_consumer.h"
38+
3539
#include <engines/ep/src/ep_types.h>
3640
#include <gmock/gmock.h>
3741
#include <gtest/gtest.h>
@@ -1235,3 +1239,113 @@ TYPED_TEST(CheckpointTest,
12351239
// Test - second item (duplicate key) should return false.
12361240
EXPECT_FALSE(this->queueNewItem("key"));
12371241
}
1242+
1243+
/*
1244+
* We always want to close the current open checkpoint on replica-vbuckets
1245+
* when the Consumer receives the snapshotEnd mutation of a memory-snapshot.
1246+
*/
1247+
TEST_F(SingleThreadedCheckpointTest,
1248+
MB30019_CloseReplicaCheckpointOnMemorySnapshotEnd) {
1249+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1250+
auto vb = store->getVBuckets().getBucket(vbid);
1251+
auto* ckptMgr = vb->checkpointManager.get();
1252+
ASSERT_NE(nullptr, ckptMgr);
1253+
1254+
// We must have only 1 open checkpoint
1255+
ASSERT_EQ(1, ckptMgr->getNumCheckpoints());
1256+
// We must have only one cursor (the persistence cursor), as there is no
1257+
// DCP producer for vbid
1258+
ASSERT_EQ(1, ckptMgr->getNumOfCursors());
1259+
// We must have only the checkpoint-open and the vbucket-state meta-items
1260+
// in the open checkpoint
1261+
ASSERT_EQ(2, ckptMgr->getNumItems());
1262+
ASSERT_EQ(0, ckptMgr->getNumOpenChkItems());
1263+
1264+
auto consumer =
1265+
std::make_shared<MockDcpConsumer>(*engine, cookie, "test-consumer");
1266+
auto passiveStream = std::static_pointer_cast<MockPassiveStream>(
1267+
consumer->makePassiveStream(
1268+
*engine,
1269+
consumer,
1270+
"test-passive-stream",
1271+
0 /* flags */,
1272+
0 /* opaque */,
1273+
vbid,
1274+
0 /* startSeqno */,
1275+
std::numeric_limits<uint64_t>::max() /* endSeqno */,
1276+
0 /* vbUuid */,
1277+
0 /* snapStartSeqno */,
1278+
0 /* snapEndSeqno */,
1279+
0 /* vb_high_seqno */));
1280+
1281+
const size_t snapshotEnd = 3;
1282+
// 1) the consumer receives the snapshot-marker
1283+
SnapshotMarker snapshotMarker(
1284+
0 /* opaque */,
1285+
vbid,
1286+
0 /* startSeqno */,
1287+
snapshotEnd /* endSeqno */,
1288+
dcp_marker_flag_t::MARKER_FLAG_MEMORY /* flags */);
1289+
passiveStream->processMarker(&snapshotMarker);
1290+
1291+
// 2) the consumer receives the mutations until (snapshotEnd -1)
1292+
size_t i = 1;
1293+
for (; i < snapshotEnd; i++) {
1294+
// Queue item
1295+
queued_item qi(new Item(makeStoredDocKey("key_" + std::to_string(i)),
1296+
0 /*flags*/,
1297+
0 /*expiry*/,
1298+
"value",
1299+
5 /*valueSize*/,
1300+
PROTOCOL_BINARY_RAW_BYTES,
1301+
0 /*cas*/,
1302+
i /*bySeqno*/,
1303+
vb->getId()));
1304+
1305+
MutationResponse mutation(std::move(qi), 0 /* opaque */);
1306+
1307+
// PassiveStream::processMutation does 2 things:
1308+
// 1) setWithMeta (which enqueues the item into the checkpoint)
1309+
// 2) calls PassiveStream::handleSnapshotEnd (which must close the
1310+
// open checkpoint if the current mutation is the
1311+
// snapshot-end)
1312+
passiveStream->processMutation(&mutation);
1313+
}
1314+
// We must have 2 items in the checkpoint now
1315+
ASSERT_EQ(snapshotEnd - 1, ckptMgr->getNumOpenChkItems());
1316+
// We still must have only 1 open checkpoint, as the consumer has not
1317+
// received the snapshot-end mutation
1318+
ASSERT_EQ(1, ckptMgr->getNumCheckpoints());
1319+
1320+
// 3) the consumer receives the snapshotEnd mutation
1321+
queued_item qi(
1322+
new Item(makeStoredDocKey("key_" + std::to_string(snapshotEnd)),
1323+
0 /*flags*/,
1324+
0 /*expiry*/,
1325+
"value",
1326+
5 /*valueSize*/,
1327+
PROTOCOL_BINARY_RAW_BYTES,
1328+
0 /*cas*/,
1329+
i /*bySeqno*/,
1330+
vb->getId()));
1331+
MutationResponse mutation(std::move(qi), 0 /* opaque */);
1332+
passiveStream->processMutation(&mutation);
1333+
1334+
// The consumer has received the snapshotEnd mutation, now we expect
1335+
// that a new (empty) open checkpoint has been created. So we must have
1336+
// 2 checkpoints in total (the closed and the new open one).
1337+
ASSERT_EQ(2, ckptMgr->getNumCheckpoints());
1338+
1339+
// Also, the new open checkpoint must be empty (all mutations are in the
1340+
// closed one)
1341+
const auto& ckptList =
1342+
CheckpointManagerTestIntrospector::public_getCheckpointList(
1343+
*ckptMgr);
1344+
ASSERT_EQ(ckptList.back()->getId(), ckptList.front()->getId() + 1);
1345+
ASSERT_EQ(checkpoint_state::CHECKPOINT_CLOSED,
1346+
ckptList.front()->getState_UNLOCKED());
1347+
ASSERT_EQ(snapshotEnd, ckptList.front()->getNumItems());
1348+
ASSERT_EQ(checkpoint_state::CHECKPOINT_OPEN,
1349+
ckptList.back()->getState_UNLOCKED());
1350+
ASSERT_EQ(0, ckptList.back()->getNumItems());
1351+
}

engines/ep/tests/module_tests/checkpoint_test.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "checkpoint_config.h"
2323
#include "configuration.h"
24+
#include "evp_store_single_threaded_test.h"
2425

2526
#include <gtest/gtest.h>
2627

@@ -60,4 +61,9 @@ class CheckpointTest : public ::testing::Test {
6061
std::shared_ptr<Callback<uint16_t> > callback;
6162
std::unique_ptr<V> vbucket;
6263
std::unique_ptr<CheckpointManager> manager;
63-
};
64+
};
65+
66+
/*
67+
* Test fixture for single-threaded Checkpoint tests
68+
*/
69+
class SingleThreadedCheckpointTest : public SingleThreadedKVBucketTest {};
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2018 Couchbase, Inc
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "checkpoint.h"
19+
20+
/**
21+
* Stateless class used to gain privileged access into CheckpointManager for
22+
* testing purposes. This is a friend class of CheckpointManager.
23+
*/
24+
class CheckpointManagerTestIntrospector {
25+
public:
26+
static const CheckpointList& public_getCheckpointList(
27+
CheckpointManager& checkpointManager) {
28+
return checkpointManager.checkpointList;
29+
}
30+
};

0 commit comments

Comments
 (0)