Skip to content

Commit 2b37a5a

Browse files
Sriram Ganesandave-finlay
authored andcommitted
MB-30234: Revert "Always close replica-checkpoint on memory-snapshot-end"
This reverts commit 075614a (MB-30019) Reverting this commit as this has resulted in a throughput degradation Change-Id: I12ba07f8155c998c0c797bf88baa6fa569a404bf Reviewed-on: http://review.couchbase.org/96014 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Finlay <[email protected]> Tested-by: Dave Finlay <[email protected]>
1 parent eebaff6 commit 2b37a5a

File tree

9 files changed

+33
-173
lines changed

9 files changed

+33
-173
lines changed

engines/ep/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,6 @@ if (COUCHBASE_KV_BUILD_UNIT_TESTS)
280280
tests/module_tests/checkpoint_remover_test.cc
281281
tests/module_tests/checkpoint_test.h
282282
tests/module_tests/checkpoint_test.cc
283-
tests/module_tests/checkpoint_utils.h
284283
tests/module_tests/collections/collection_dockey_test.cc
285284
tests/module_tests/collections/evp_store_collections_dcp_test.cc
286285
tests/module_tests/collections/evp_store_collections_eraser_test.cc

engines/ep/src/dcp/stream.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2722,17 +2722,23 @@ void PassiveStream::processSetVBucketState(SetVBucketState* state) {
27222722
void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
27232723
if (byseqno == cur_snapshot_end.load()) {
27242724
auto& ckptMgr = *vb->checkpointManager;
2725-
27262725
if (cur_snapshot_type.load() == Snapshot::Disk &&
27272726
vb->isBackfillPhase()) {
27282727
vb->setBackfillPhase(false);
2728+
const auto id = ckptMgr.getOpenCheckpointId() + 1;
2729+
ckptMgr.checkAndAddNewCheckpoint(id, *vb);
2730+
} else {
2731+
size_t mem_threshold = engine->getEpStats().mem_high_wat.load();
2732+
size_t mem_used =
2733+
engine->getEpStats().getEstimatedTotalMemoryUsed();
2734+
/* We want to add a new replica checkpoint if the mem usage is above
2735+
high watermark (85%) */
2736+
if (mem_threshold < mem_used) {
2737+
const auto id = ckptMgr.getOpenCheckpointId() + 1;
2738+
ckptMgr.checkAndAddNewCheckpoint(id, *vb);
2739+
}
27292740
}
27302741

2731-
// MB-30019: we always want to close the open checkpoint on replica
2732-
// vbuckets when the Consumer receives the snapshot-end mutation
2733-
ckptMgr.checkAndAddNewCheckpoint(ckptMgr.getOpenCheckpointId() + 1,
2734-
*vb);
2735-
27362742
if (cur_snapshot_ack) {
27372743
{
27382744
LockHolder lh(streamMutex);

engines/ep/src/dcp/stream.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ class PassiveStream : public Stream {
742742

743743
bool transitionState(StreamState newState);
744744

745-
virtual ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
745+
ENGINE_ERROR_CODE processMutation(MutationResponse* mutation);
746746

747747
ENGINE_ERROR_CODE processDeletion(MutationResponse* deletion);
748748

@@ -782,7 +782,7 @@ class PassiveStream : public Stream {
782782

783783
void handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno);
784784

785-
virtual void processMarker(SnapshotMarker* marker);
785+
void processMarker(SnapshotMarker* marker);
786786

787787
void processSetVBucketState(SetVBucketState* state);
788788

engines/ep/tests/mock/mock_stream.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -272,14 +272,6 @@ class MockPassiveStream : public PassiveStream {
272272
return PassiveStream::messageReceived(std::move(dcpResponse));
273273
}
274274

275-
void processMarker(SnapshotMarker* marker) override {
276-
PassiveStream::processMarker(marker);
277-
}
278-
279-
ENGINE_ERROR_CODE processMutation(MutationResponse* mutation) override {
280-
return PassiveStream::processMutation(mutation);
281-
}
282-
283275
size_t getNumBufferItems() const {
284276
LockHolder lh(buffer.bufMutex);
285277
return buffer.messages.size();

engines/ep/tests/module_tests/checkpoint_remover_test.cc

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
*/
1717

1818
#include "checkpoint_remover_test.h"
19-
20-
#include "checkpoint_utils.h"
21-
2219
#include <engines/ep/src/checkpoint_remover.h>
2320

2421
#include "../mock/mock_dcp.h"
@@ -31,6 +28,12 @@ size_t CheckpointRemoverTest::getMaxCheckpointItems(VBucket& vb) {
3128
return vb.checkpointManager->getCheckpointConfig().getCheckpointMaxItems();
3229
}
3330

31+
const CheckpointList&
32+
CheckpointManagerTestIntrospector::public_getCheckpointList(
33+
CheckpointManager& checkpointManager) {
34+
return checkpointManager.checkpointList;
35+
}
36+
3437
/**
3538
* Check that the VBucketMap.getActiveVBucketsSortedByChkMgrMem() returns the
3639
* correct ordering of vBuckets, sorted from largest memory usage to smallest.
@@ -204,4 +207,4 @@ TEST_F(CheckpointRemoverEPTest, CursorDropMemoryFreed) {
204207

205208
// There should only be the one checkpoint cursor now for persistence
206209
ASSERT_EQ(1, checkpointManager->getNumOfCursors());
207-
}
210+
}

engines/ep/tests/module_tests/checkpoint_remover_test.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,13 @@ class CheckpointRemoverEPTest : public CheckpointRemoverTest {
5050
return dynamic_cast<EPBucket&>(*store);
5151
}
5252
};
53+
54+
/**
55+
* Stateless class used to gain privileged access into CheckpointManager for
56+
* testing purposes. This is a friend class of CheckpointManager.
57+
*/
58+
class CheckpointManagerTestIntrospector {
59+
public:
60+
static const CheckpointList& public_getCheckpointList(
61+
CheckpointManager& checkpointManager);
62+
};

engines/ep/tests/module_tests/checkpoint_test.cc

Lines changed: 1 addition & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
* limitations under the License.
1616
*/
1717

18-
#include "checkpoint_test.h"
19-
2018
#include "config.h"
2119

2220
#include <algorithm>
@@ -25,7 +23,7 @@
2523
#include <vector>
2624

2725
#include "checkpoint.h"
28-
#include "checkpoint_utils.h"
26+
#include "checkpoint_test.h"
2927
#include "configuration.h"
3028
#include "ep_vb.h"
3129
#include "failover-table.h"
@@ -34,8 +32,6 @@
3432
#include "tests/module_tests/test_helpers.h"
3533
#include "thread_gate.h"
3634

37-
#include "../mock/mock_dcp_consumer.h"
38-
3935
#include <engines/ep/src/ep_types.h>
4036
#include <gmock/gmock.h>
4137
#include <gtest/gtest.h>
@@ -1239,113 +1235,3 @@ TYPED_TEST(CheckpointTest,
12391235
// Test - second item (duplicate key) should return false.
12401236
EXPECT_FALSE(this->queueNewItem("key"));
12411237
}
1242-
1243-
/*
1244-
* We always want to close the current open checkpoint on replica-vbuckets
1245-
* when the Consumer receives the snapshotEnd mutation of a memory-snapshot.
1246-
*/
1247-
TEST_F(SingleThreadedCheckpointTest,
1248-
MB30019_CloseReplicaCheckpointOnMemorySnapshotEnd) {
1249-
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
1250-
auto vb = store->getVBuckets().getBucket(vbid);
1251-
auto* ckptMgr = vb->checkpointManager.get();
1252-
ASSERT_NE(nullptr, ckptMgr);
1253-
1254-
// We must have only 1 open checkpoint
1255-
ASSERT_EQ(1, ckptMgr->getNumCheckpoints());
1256-
// We must have only one cursor (the persistence cursor), as there is no
1257-
// DCP producer for vbid
1258-
ASSERT_EQ(1, ckptMgr->getNumOfCursors());
1259-
// We must have only the checkpoint-open and the vbucket-state meta-items
1260-
// in the open checkpoint
1261-
ASSERT_EQ(2, ckptMgr->getNumItems());
1262-
ASSERT_EQ(0, ckptMgr->getNumOpenChkItems());
1263-
1264-
auto consumer =
1265-
std::make_shared<MockDcpConsumer>(*engine, cookie, "test-consumer");
1266-
auto passiveStream = std::static_pointer_cast<MockPassiveStream>(
1267-
consumer->makePassiveStream(
1268-
*engine,
1269-
consumer,
1270-
"test-passive-stream",
1271-
0 /* flags */,
1272-
0 /* opaque */,
1273-
vbid,
1274-
0 /* startSeqno */,
1275-
std::numeric_limits<uint64_t>::max() /* endSeqno */,
1276-
0 /* vbUuid */,
1277-
0 /* snapStartSeqno */,
1278-
0 /* snapEndSeqno */,
1279-
0 /* vb_high_seqno */));
1280-
1281-
const size_t snapshotEnd = 3;
1282-
// 1) the consumer receives the snapshot-marker
1283-
SnapshotMarker snapshotMarker(
1284-
0 /* opaque */,
1285-
vbid,
1286-
0 /* startSeqno */,
1287-
snapshotEnd /* endSeqno */,
1288-
dcp_marker_flag_t::MARKER_FLAG_MEMORY /* flags */);
1289-
passiveStream->processMarker(&snapshotMarker);
1290-
1291-
// 2) the consumer receives the mutations until (snapshotEnd -1)
1292-
size_t i = 1;
1293-
for (; i < snapshotEnd; i++) {
1294-
// Queue item
1295-
queued_item qi(new Item(makeStoredDocKey("key_" + std::to_string(i)),
1296-
0 /*flags*/,
1297-
0 /*expiry*/,
1298-
"value",
1299-
5 /*valueSize*/,
1300-
PROTOCOL_BINARY_RAW_BYTES,
1301-
0 /*cas*/,
1302-
i /*bySeqno*/,
1303-
vb->getId()));
1304-
1305-
MutationResponse mutation(std::move(qi), 0 /* opaque */);
1306-
1307-
// PassiveStream::processMutation does 2 things:
1308-
// 1) setWithMeta (which enqueues the item into the checkpoint)
1309-
// 2) calls PassiveStream::handleSnapshotEnd (which must close the
1310-
// open checkpoint if the current mutation is the
1311-
// snapshot-end)
1312-
passiveStream->processMutation(&mutation);
1313-
}
1314-
// We must have 2 items in the checkpoint now
1315-
ASSERT_EQ(snapshotEnd - 1, ckptMgr->getNumOpenChkItems());
1316-
// We still must have only 1 open checkpoint, as the consumer has not
1317-
// received the snapshot-end mutation
1318-
ASSERT_EQ(1, ckptMgr->getNumCheckpoints());
1319-
1320-
// 3) the consumer receives the snapshotEnd mutation
1321-
queued_item qi(
1322-
new Item(makeStoredDocKey("key_" + std::to_string(snapshotEnd)),
1323-
0 /*flags*/,
1324-
0 /*expiry*/,
1325-
"value",
1326-
5 /*valueSize*/,
1327-
PROTOCOL_BINARY_RAW_BYTES,
1328-
0 /*cas*/,
1329-
i /*bySeqno*/,
1330-
vb->getId()));
1331-
MutationResponse mutation(std::move(qi), 0 /* opaque */);
1332-
passiveStream->processMutation(&mutation);
1333-
1334-
// The consumer has received the snapshotEnd mutation, now we expect
1335-
// that a new (empty) open checkpoint has been created. So we must have
1336-
// 2 checkpoints in total (the closed and the new open one).
1337-
ASSERT_EQ(2, ckptMgr->getNumCheckpoints());
1338-
1339-
// Also, the new open checkpoint must be empty (all mutations are in the
1340-
// closed one)
1341-
const auto& ckptList =
1342-
CheckpointManagerTestIntrospector::public_getCheckpointList(
1343-
*ckptMgr);
1344-
ASSERT_EQ(ckptList.back()->getId(), ckptList.front()->getId() + 1);
1345-
ASSERT_EQ(checkpoint_state::CHECKPOINT_CLOSED,
1346-
ckptList.front()->getState_UNLOCKED());
1347-
ASSERT_EQ(snapshotEnd, ckptList.front()->getNumItems());
1348-
ASSERT_EQ(checkpoint_state::CHECKPOINT_OPEN,
1349-
ckptList.back()->getState_UNLOCKED());
1350-
ASSERT_EQ(0, ckptList.back()->getNumItems());
1351-
}

engines/ep/tests/module_tests/checkpoint_test.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include "checkpoint_config.h"
2323
#include "configuration.h"
24-
#include "evp_store_single_threaded_test.h"
2524

2625
#include <gtest/gtest.h>
2726

@@ -61,9 +60,4 @@ class CheckpointTest : public ::testing::Test {
6160
std::shared_ptr<Callback<uint16_t> > callback;
6261
std::unique_ptr<V> vbucket;
6362
std::unique_ptr<CheckpointManager> manager;
64-
};
65-
66-
/*
67-
* Test fixture for single-threaded Checkpoint tests
68-
*/
69-
class SingleThreadedCheckpointTest : public SingleThreadedKVBucketTest {};
63+
};

engines/ep/tests/module_tests/checkpoint_utils.h

Lines changed: 0 additions & 30 deletions
This file was deleted.

0 commit comments

Comments
 (0)