Skip to content

Commit 178bcae

Browse files
committed
Merge commit neo/87dd4209e into master
* 87dd420 Revert "MB-57400: Use an estimated items_remaining count for Prometheus stats" * 6dcc716 Revert "MB-57400: "estimated" items_remaining drops to 0 when cursor settles" * c328ad5 MB-52276: Remove unused 'numItems' args in DCPTest::removeCheckpoint() * 25598d6 MB-52276: Remove unused arg in STActiveStreamTest::setupProducer * 69da985 [BP] MB-52276: Remove broken and unused Checkpoint::numMetaItems * dcaec69 [BP] MB-52276: Remove unused Checkpoint::numItems * 831cc2a [BP] MB-52276: Make ActiveStream::getItemsRemaining consistent * 3af4664 [BP] MB-52276: Make CheckpointCursor::getRemainingItemsCount O(1) * d324f13 MB-52276: Fix StreamTest.BackfillSmallBuffer * 27a56dc [BP] MB-52276: Fix CheckpointCursor::getRemainingItemsCount * f2b1d9e [BP] MB-52276: Checkpoint::getNumItems() accounts all items * 2b6245d [BP] MB-52276: Don't use Checkpoint::numItems at CM::maybeCreateNewCheckpoint * 52d3e96 [BP] MB-52276: Don't use Checkpoint::numItems at CM::getVisibleSnapshotEndSeqno * 88210c5 [BP] MB-52276: Don't use Checkpoint::numItems at CM::getSnapshotInfo * f2652b6 [BP] MB-52276: Don't rely on Checkpoint::numItems at ItemExpel Change-Id: I170fcd4544cbfaaf37e7dc55feff63e5fb7956ca
2 parents c1d3144 + 87dd420 commit 178bcae

16 files changed

+49
-79
lines changed

engines/ep/src/dcp/active_stream.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,8 @@ class ActiveStream : public Stream,
291291
CookieIface& c,
292292
const VBucket& vb);
293293

294-
/* Returns a count of how many items are outstanding to be sent for this
294+
/**
295+
* Returns a count of how many items are outstanding to be sent for this
295296
* stream's vBucket.
296297
*/
297298
size_t getItemsRemaining();

engines/ep/src/dcp/producer.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,11 @@ class DcpProducer : public ConnHandler,
385385
*/
386386
void scheduleCheckpointProcessorTask(std::shared_ptr<ActiveStream> s);
387387

388+
/**
389+
* Schedule the checkpointCreatorTask on the ExecutorPool
390+
*/
391+
void scheduleCheckpointProcessorTask();
392+
388393
/** Searches the streams map for a stream for vbucket ID. Returns the
389394
* found stream, or an empty pointer if none found.
390395
*/
@@ -449,11 +454,6 @@ class DcpProducer : public ConnHandler,
449454
*/
450455
void createCheckpointProcessorTask();
451456

452-
/**
453-
* Schedule the checkpointCreatorTask on the ExecutorPool
454-
*/
455-
void scheduleCheckpointProcessorTask();
456-
457457
struct {
458458
// The time of the last noop message sent by this producer
459459
std::chrono::steady_clock::time_point sendTime;

engines/ep/src/ep_engine.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4047,6 +4047,11 @@ struct ConnStatBuilder {
40474047
};
40484048

40494049
struct ConnAggStatBuilder {
4050+
/**
4051+
* Construct with the separator.
4052+
* @param sep The separator used for determining "type" of DCP connection
4053+
* by splitting the connection name with sep.
4054+
*/
40504055
ConnAggStatBuilder(std::string_view sep) : sep(sep) {
40514056
}
40524057

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,6 @@ class MockDcpProducer : public DcpProducer {
110110
DcpProducer::createCheckpointProcessorTask();
111111
}
112112

113-
/**
114-
* Schedule the checkpointCreator->task on the ExecutorPool
115-
*/
116-
void scheduleCheckpointProcessorTask() {
117-
DcpProducer::scheduleCheckpointProcessorTask();
118-
}
119-
120113
ActiveStreamCheckpointProcessorTask* getCheckpointSnapshotTask() const;
121114

122115
/**

engines/ep/tests/module_tests/dcp_durability_stream_test.cc

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,14 @@
3939

4040
void DurabilityActiveStreamTest::SetUp() {
4141
SingleThreadedActiveStreamTest::SetUp();
42-
setUp(false /*startCheckpointProcessorTask*/);
42+
setUp();
4343
}
4444

4545
void DurabilityActiveStreamTest::TearDown() {
4646
SingleThreadedActiveStreamTest::TearDown();
4747
}
4848

49-
void DurabilityActiveStreamTest::setUp(bool startCheckpointProcessorTask,
50-
bool persist) {
49+
void DurabilityActiveStreamTest::setUp(bool persist) {
5150
setVBucketState(vbid,
5251
vbucket_state_active,
5352
{{"topology", nlohmann::json::array({{active, replica}})}});
@@ -62,8 +61,7 @@ void DurabilityActiveStreamTest::setUp(bool startCheckpointProcessorTask,
6261
// Enable SyncReplication and flow-control (Producer BufferLog)
6362
setupProducer({{"enable_sync_writes", "true"},
6463
{"connection_buffer_size", "52428800"},
65-
{"consumer_name", "test_consumer"}},
66-
startCheckpointProcessorTask);
64+
{"consumer_name", "test_consumer"}});
6765
ASSERT_TRUE(stream->public_supportSyncReplication());
6866
}
6967

@@ -391,7 +389,6 @@ TEST_P(DurabilityActiveStreamTest, SendDcpAbort) {
391389
}
392390

393391
TEST_P(DurabilityActiveStreamTest, BackfillDurabilityLevel) {
394-
startCheckpointTask();
395392
auto vb = engine->getVBucket(vbid);
396393
auto& ckptMgr = *vb->checkpointManager;
397394
// Get rid of set_vb_state and any other queue_op we are not interested in
@@ -4661,8 +4658,7 @@ void DurabilityPromotionStreamTest::testDiskCheckpointStreamedAsDiskSnapshot() {
46614658
}
46624659

46634660
// 3) Set up the Producer and ActiveStream
4664-
DurabilityActiveStreamTest::setUp(true /*startCheckpointProcessorTask*/,
4665-
false /*persist*/);
4661+
DurabilityActiveStreamTest::setUp(false /*persist*/);
46664662

46674663
// The vbstate-change must have:
46684664
// - closed the checkpoint snap{2, 4, Disk}
@@ -4931,8 +4927,7 @@ void DurabilityPromotionStreamTest::
49314927

49324928
// 3) Simulate vbstate-change Replica->Active (we have also a Producer and
49334929
// ActiveStream from this point onward)
4934-
DurabilityActiveStreamTest::setUp(true /*startCheckpointProcessorTask*/,
4935-
false /*persist*/);
4930+
DurabilityActiveStreamTest::setUp(false /*persist*/);
49364931
ASSERT_TRUE(producer);
49374932
auto* activeStream = DurabilityActiveStreamTest::stream.get();
49384933
ASSERT_TRUE(activeStream);
@@ -5187,8 +5182,7 @@ void DurabilityPromotionStreamTest::
51875182

51885183
// 3) Simulate vbstate-change Replica->Active (we have also a Producer and
51895184
// ActiveStream from this point onward)
5190-
DurabilityActiveStreamTest::setUp(true /*startCheckpointProcessorTask*/,
5191-
false /*persist*/);
5185+
DurabilityActiveStreamTest::setUp(false /*persist*/);
51925186
ASSERT_TRUE(producer);
51935187
auto* activeStream = DurabilityActiveStreamTest::stream.get();
51945188
ASSERT_TRUE(activeStream);
@@ -5392,8 +5386,7 @@ TEST_P(DurabilityPromotionStreamTest,
53925386

53935387
// Simulate vbstate-change Replica->Active (we have also a Producer and
53945388
// ActiveStream from this point onward)
5395-
DurabilityActiveStreamTest::setUp(true /*startCheckpointProcessorTask*/,
5396-
false /*persist*/);
5389+
DurabilityActiveStreamTest::setUp(false /*persist*/);
53975390
ASSERT_TRUE(producer);
53985391
auto* activeStream = DurabilityActiveStreamTest::stream.get();
53995392
ASSERT_TRUE(activeStream);
@@ -5528,8 +5521,7 @@ TEST_P(DurabilityPromotionStreamTest,
55285521

55295522
// Simulate vbstate-change Replica->Active (we have also a Producer and
55305523
// ActiveStream from this point onward)
5531-
DurabilityActiveStreamTest::setUp(true /*startCheckpointProcessorTask*/,
5532-
false /*persist*/);
5524+
DurabilityActiveStreamTest::setUp(false /*persist*/);
55335525
ASSERT_TRUE(producer);
55345526
auto* activeStream = DurabilityActiveStreamTest::stream.get();
55355527
ASSERT_TRUE(activeStream);
@@ -5622,7 +5614,7 @@ TEST_P(DurabilityPromotionStreamTest, ReplicaDeadActiveCanCommitPrepare) {
56225614

56235615
// Simulate vbstate-change Dead->Active (we have also a Producer and
56245616
// ActiveStream from this point onward)
5625-
DurabilityActiveStreamTest::setUp(true /*startCheckpointProcessorTask*/);
5617+
DurabilityActiveStreamTest::setUp();
56265618
ASSERT_TRUE(producer);
56275619
auto* activeStream = DurabilityActiveStreamTest::stream.get();
56285620
ASSERT_TRUE(activeStream);

engines/ep/tests/module_tests/dcp_durability_stream_test.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class DurabilityActiveStreamTest
2525
/**
2626
* Does the DurabilityActiveStreamTest specific setup
2727
*/
28-
void setUp(bool startCheckpointProcessorTask, bool persist = true);
28+
void setUp(bool persist = true);
2929

3030
/*
3131
* Queues a Prepare and verifies that the corresponding DCP_PREPARE

engines/ep/tests/module_tests/dcp_stream_ephemeral_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ TEST_P(EphemeralStreamTest, EphemeralBackfillSnapshotHasNoDuplicates) {
9090
store_item(vbid, "key2", "value1");
9191
}
9292

93-
removeCheckpoint(numItems);
93+
removeCheckpoint();
9494

9595
/* Set up a DCP stream for the backfill */
9696
setup_dcp_stream();

engines/ep/tests/module_tests/dcp_stream_sync_repl_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ void DcpStreamSyncReplTest::testBackfillPrepare(DocumentState docState,
545545
// Store a pending item then remove the checkpoint to force backfill.
546546
using cb::durability::Level;
547547
auto prepared = storePending(docState, "1", "X", {level, {}});
548-
removeCheckpoint(1);
548+
removeCheckpoint();
549549

550550
// Create sync repl DCP stream
551551
setup_dcp_stream(0,
@@ -623,7 +623,7 @@ void DcpStreamSyncReplTest::testBackfillPrepareCommit(
623623
EXPECT_EQ(1, pdm.getNumTracked());
624624
EXPECT_EQ(0, pdm.getHighCompletedSeqno());
625625

626-
removeCheckpoint(1);
626+
removeCheckpoint();
627627
EXPECT_EQ(1, vb0->getPersistenceSeqno());
628628

629629
EXPECT_EQ(cb::engine_errc::success,
@@ -635,7 +635,7 @@ void DcpStreamSyncReplTest::testBackfillPrepareCommit(
635635
EXPECT_EQ(0, pdm.getNumTracked());
636636
EXPECT_EQ(1, pdm.getHighCompletedSeqno());
637637

638-
removeCheckpoint(1);
638+
removeCheckpoint();
639639
EXPECT_EQ(2, vb0->getPersistenceSeqno());
640640

641641
// Create sync repl DCP stream
@@ -730,7 +730,7 @@ void DcpStreamSyncReplTest::testBackfillPrepareAbort(
730730
{},
731731
vb0->lockCollections(prepared->getKey())));
732732
}
733-
removeCheckpoint(2);
733+
removeCheckpoint();
734734

735735
// Create sync repl DCP stream
736736
setup_dcp_stream(0,

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -885,12 +885,12 @@ TEST_P(StreamTest, BackfillSmallBuffer) {
885885
size of a mutation */
886886
producer->setBackfillBufferSize(1);
887887

888+
ASSERT_TRUE(stream->isBackfilling());
889+
ASSERT_EQ(stream->getNumBackfillPauses(), 0);
890+
888891
/* We want the backfill task to run in a background thread */
889892
ExecutorPool::get()->setNumAuxIO(ThreadPoolConfig::AuxIoThreadCount{1});
890893

891-
EXPECT_EQ(stream->getNumBackfillPauses(), 0);
892-
stream->transitionStateToBackfilling();
893-
894894
/* Backfill can only read 1 as its buffer will become full after that */
895895
cb::waitForPredicate(
896896
[&] { return (numItems - 1) == stream->getLastBackfilledSeqno(); });
@@ -1404,7 +1404,7 @@ class CacheCallbackTest : public StreamTest {
14041404
StreamTest::SetUp();
14051405
store_item(vbid, key, "value");
14061406

1407-
removeCheckpoint(numItems);
1407+
removeCheckpoint();
14081408

14091409
/* Set up a DCP stream for the backfill */
14101410
setup_dcp_stream();
@@ -1571,16 +1571,8 @@ void SingleThreadedActiveStreamTest::TearDown() {
15711571
STParameterizedBucketTest::TearDown();
15721572
}
15731573

1574-
void SingleThreadedActiveStreamTest::startCheckpointTask() {
1575-
if (!producer->getCheckpointSnapshotTask()) {
1576-
producer->createCheckpointProcessorTask();
1577-
producer->scheduleCheckpointProcessorTask();
1578-
}
1579-
}
1580-
15811574
void SingleThreadedActiveStreamTest::setupProducer(
1582-
const std::vector<std::pair<std::string, std::string>>& controls,
1583-
bool startCheckpointProcessorTask) {
1575+
const std::vector<std::pair<std::string, std::string>>& controls) {
15841576
uint32_t flags = 0;
15851577

15861578
// We don't set the startTask flag here because we will create the task
@@ -1592,10 +1584,7 @@ void SingleThreadedActiveStreamTest::setupProducer(
15921584
flags,
15931585
false /*startTask*/);
15941586
producer->createCheckpointProcessorTask();
1595-
1596-
if (startCheckpointProcessorTask) {
1597-
producer->scheduleCheckpointProcessorTask();
1598-
}
1587+
producer->scheduleCheckpointProcessorTask();
15991588

16001589
for (const auto& c : controls) {
16011590
EXPECT_EQ(cb::engine_errc::success,
@@ -3433,7 +3422,7 @@ TEST_P(SingleThreadedActiveToPassiveStreamTest,
34333422
TEST_P(SingleThreadedActiveStreamTest,
34343423
FirstSnapshotHasRequestedStartSnapSeqno) {
34353424
// Replace initial stream with one registered with DCP producer.
3436-
setupProducer({}, true);
3425+
setupProducer({});
34373426

34383427
auto& vb = *store->getVBucket(vbid);
34393428
auto& cm = *vb.checkpointManager;
@@ -3486,7 +3475,7 @@ TEST_P(SingleThreadedActiveStreamTest,
34863475
*/
34873476
TEST_P(SingleThreadedActiveStreamTest, MetaOnlyCheckpointsSkipped) {
34883477
// Replace initial stream with one registered with DCP producer.
3489-
setupProducer({}, true);
3478+
setupProducer({});
34903479

34913480
auto& vb = *store->getVBucket(vbid);
34923481
auto& cm = *vb.checkpointManager;
@@ -3759,11 +3748,7 @@ TEST_P(SingleThreadedActiveStreamTest, CompleteBackfillRaceNoStreamEnd) {
37593748
// seqno(s) until another mutation for that vBucket occurs.
37603749
TEST_P(SingleThreadedActiveStreamTest,
37613750
RaceBetweenNotifyAndProcessingExistingItems) {
3762-
// Replace initial stream with one registered with DCP producer.
3763-
setupProducer({}, true);
3764-
37653751
auto vb = engine->getVBucket(vbid);
3766-
producer->scheduleCheckpointProcessorTask();
37673752
stream = producer->mockActiveStreamRequest(0,
37683753
/*opaque*/ 0,
37693754
*vb,
@@ -6040,7 +6025,6 @@ void CDCActiveStreamTest::SetUp() {
60406025
producer->control(0, DcpControlKeys::ChangeStreams, "true"));
60416026
ASSERT_TRUE(producer->areChangeStreamsEnabled());
60426027
producer->public_enableSyncReplication();
6043-
startCheckpointTask();
60446028

60456029
recreateStream(*vb,
60466030
true,

engines/ep/tests/module_tests/dcp_stream_test.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,8 @@ class SingleThreadedActiveStreamTest
4747
void SetUp() override;
4848
void TearDown() override;
4949

50-
void startCheckpointTask();
51-
5250
void setupProducer(const std::vector<std::pair<std::string, std::string>>&
53-
controls = {},
54-
bool startCheckpointProcessorTask = false);
51+
controls = {});
5552

5653
MutationStatus public_processSet(VBucket& vb,
5754
Item& item,

0 commit comments

Comments
 (0)