Skip to content

Commit 87dd420

Browse files
committed
Revert "MB-57400: Use an estimated items_remaining count for Prometheus stats"
This reverts commit c6397f2. - Reason for revert - After the backport of MB-52276 to Neo "estimate" is no longer needed. This is the second of 2 reverts for MB-57400. Change-Id: I7f6493706a30db76bcf131aea14f3a6ff5cbe95e Reviewed-on: https://review.couchbase.org/c/kv_engine/+/194197 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 6dcc716 commit 87dd420

File tree

13 files changed

+44
-131
lines changed

13 files changed

+44
-131
lines changed

engines/ep/src/checkpoint_manager.cc

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,16 +1355,15 @@ void CheckpointManager::checkOpenCheckpoint(
13551355
}
13561356
}
13571357

1358-
size_t CheckpointManager::getNumItemsForCursor(const CheckpointCursor* cursor,
1359-
bool accurate) const {
1358+
size_t CheckpointManager::getNumItemsForCursor(
1359+
const CheckpointCursor* cursor) const {
13601360
std::lock_guard<std::mutex> lh(queueLock);
1361-
return getNumItemsForCursor(lh, cursor, accurate);
1361+
return getNumItemsForCursor(lh, cursor);
13621362
}
13631363

13641364
size_t CheckpointManager::getNumItemsForCursor(
13651365
const std::lock_guard<std::mutex>& lh,
1366-
const CheckpointCursor* cursor,
1367-
bool accurate) const {
1366+
const CheckpointCursor* cursor) const {
13681367
if (cursor && cursor->valid()) {
13691368
size_t items = cursor->getRemainingItemsInCurrentCheckpoint();
13701369
CheckpointList::const_iterator chkptIterator(cursor->getCheckpoint());
@@ -1690,7 +1689,7 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
16901689
"vb_%d:num_items_for_persistence",
16911690
vbucketId.get());
16921691
add_casted_stat(buf.data(),
1693-
getNumItemsForCursor(lh, persistenceCursor, true),
1692+
getNumItemsForCursor(lh, persistenceCursor),
16941693
add_stat,
16951694
cookie);
16961695
}
@@ -1730,21 +1729,10 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
17301729
"vb_%d:%s:num_items_for_cursor",
17311730
vbucketId.get(),
17321731
cursor.second->getName().c_str());
1733-
add_casted_stat(
1734-
buf.data(),
1735-
getNumItemsForCursor(lh, cursor.second.get(), true),
1736-
add_stat,
1737-
cookie);
1738-
checked_snprintf(buf.data(),
1739-
buf.size(),
1740-
"vb_%d:%s:estimated_num_items_for_cursor",
1741-
vbucketId.get(),
1742-
cursor.second->getName().c_str());
1743-
add_casted_stat(
1744-
buf.data(),
1745-
getNumItemsForCursor(lh, cursor.second.get(), false),
1746-
add_stat,
1747-
cookie);
1732+
add_casted_stat(buf.data(),
1733+
getNumItemsForCursor(lh, cursor.second.get()),
1734+
add_stat,
1735+
cookie);
17481736
}
17491737
}
17501738

engines/ep/src/checkpoint_manager.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,8 @@ class CheckpointManager {
371371
* Returns the count of all items (empty item excluded) that the given
372372
* cursor has yet to process (i.e. between the cursor's current position and
373373
* the end of the last checkpoint).
374-
*
375-
* @param cursor The cursor for which the caller want to know the item count
376-
* @param accurate [unused] Added in MB-57400, @todo: remove
377374
*/
378-
size_t getNumItemsForCursor(const CheckpointCursor* cursor,
379-
bool accurate = true) const;
375+
size_t getNumItemsForCursor(const CheckpointCursor* cursor) const;
380376

381377
/* WARNING! This method can return inaccurate counts - see MB-28431. It
382378
* at *least* can suffer from overcounting by at least 1 (in scenarios as
@@ -746,8 +742,7 @@ class CheckpointManager {
746742
* @return number of items to be processed
747743
*/
748744
size_t getNumItemsForCursor(const std::lock_guard<std::mutex>& lh,
749-
const CheckpointCursor* cursor,
750-
bool accurate) const;
745+
const CheckpointCursor* cursor) const;
751746

752747
/**
753748
* Clears this CM, effectively removing all checkpoints in the list and

engines/ep/src/connhandler.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,7 @@ class ConnHandler : public DcpConnHandlerIface {
289289

290290
virtual void addStats(const AddStatFn& add_stat, const CookieIface* c);
291291

292-
virtual void aggregateQueueStats(ConnCounter& stats_aggregator,
293-
bool accurateItemsRemaining) const {
292+
virtual void aggregateQueueStats(ConnCounter& stats_aggregator) const {
294293
// Empty
295294
}
296295

engines/ep/src/dcp/active_stream.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2263,7 +2263,7 @@ void ActiveStream::transitionState(StreamState newState) {
22632263
}
22642264
}
22652265

2266-
size_t ActiveStream::getItemsRemaining(bool accurateItemsRemaining) {
2266+
size_t ActiveStream::getItemsRemaining() {
22672267
VBucketPtr vbucket = engine->getVBucket(vb_);
22682268

22692269
if (!vbucket || !isActive()) {
@@ -2275,8 +2275,7 @@ size_t ActiveStream::getItemsRemaining(bool accurateItemsRemaining) {
22752275
// (b) Items pending in our readyQ
22762276
size_t ckptItems = 0;
22772277
if (auto sp = cursor.lock()) {
2278-
ckptItems = vbucket->checkpointManager->getNumItemsForCursor(
2279-
sp.get(), accurateItemsRemaining);
2278+
ckptItems = vbucket->checkpointManager->getNumItemsForCursor(sp.get());
22802279
}
22812280

22822281
// Note: concurrent access to readyQ guarded by streamMutex

engines/ep/src/dcp/active_stream.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,10 +286,8 @@ class ActiveStream : public Stream,
286286
/**
287287
* Returns a count of how many items are outstanding to be sent for this
288288
* stream's vBucket.
289-
*
290-
* @param accurate [unused] Added in MB-57400, @todo: remove
291289
*/
292-
size_t getItemsRemaining(bool accurateItemsRemaining = true);
290+
size_t getItemsRemaining();
293291

294292
/// @returns the count of items backfilled from disk.
295293
size_t getBackfillItemsDisk() const;

engines/ep/src/dcp/consumer.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,8 +1246,7 @@ void DcpConsumer::addStats(const AddStatFn& add_stat, const CookieIface* c) {
12461246
addStat("synchronous_replication", isSyncReplicationEnabled(), add_stat, c);
12471247
}
12481248

1249-
void DcpConsumer::aggregateQueueStats(ConnCounter& aggregator,
1250-
bool accurateItemsRemaining) const {
1249+
void DcpConsumer::aggregateQueueStats(ConnCounter& aggregator) const {
12511250
aggregator.conn_queueBackoff += backoffs;
12521251
}
12531252

engines/ep/src/dcp/consumer.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,7 @@ class DcpConsumer : public ConnHandler,
258258

259259
void addStats(const AddStatFn& add_stat, const CookieIface* c) override;
260260

261-
void aggregateQueueStats(ConnCounter& aggregator,
262-
bool accurateItemsRemaining) const override;
261+
void aggregateQueueStats(ConnCounter& aggregator) const override;
263262

264263
void notifyStreamReady(Vbid vbucket);
265264

engines/ep/src/dcp/producer.cc

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,13 +1610,12 @@ void DcpProducer::addTakeoverStats(const AddStatFn& add_stat,
16101610
add_casted_stat("backfillRemaining", 0, add_stat, c);
16111611
}
16121612

1613-
void DcpProducer::aggregateQueueStats(ConnCounter& aggregator,
1614-
bool accurateItemsRemaining) const {
1613+
void DcpProducer::aggregateQueueStats(ConnCounter& aggregator) const {
16151614
++aggregator.totalProducers;
16161615
aggregator.conn_queueDrain += itemsSent;
16171616
aggregator.conn_totalBytes += totalBytesSent;
16181617
aggregator.conn_totalUncompressedDataSize += totalUncompressedDataSize;
1619-
auto streamAggStats = getStreamAggStats(accurateItemsRemaining);
1618+
auto streamAggStats = getStreamAggStats();
16201619

16211620
aggregator.conn_queueRemaining += streamAggStats.itemsRemaining;
16221621
aggregator.conn_queueMemory += streamAggStats.readyQueueMemory;
@@ -1978,19 +1977,17 @@ size_t DcpProducer::getItemsRemaining() const {
19781977
return remainingSize;
19791978
}
19801979

1981-
DcpProducer::StreamAggStats DcpProducer::getStreamAggStats(
1982-
bool accurateItemsRemaining) const {
1980+
DcpProducer::StreamAggStats DcpProducer::getStreamAggStats() const {
19831981
DcpProducer::StreamAggStats stats;
19841982

19851983
std::for_each(
19861984
streams->begin(),
19871985
streams->end(),
1988-
[&stats, accurateItemsRemaining](const StreamsMap::value_type& vt) {
1986+
[&stats](const StreamsMap::value_type& vt) {
19891987
for (auto itr = vt.second->rlock(); !itr.end(); itr.next()) {
19901988
auto* as = itr.get().get();
19911989
if (as) {
1992-
stats.itemsRemaining +=
1993-
as->getItemsRemaining(accurateItemsRemaining);
1990+
stats.itemsRemaining += as->getItemsRemaining();
19941991
stats.readyQueueMemory += as->getReadyQueueMemory();
19951992
stats.backfillItemsDisk += as->getBackfillItemsDisk();
19961993
stats.backfillItemsMemory +=

engines/ep/src/dcp/producer.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ class DcpProducer : public ConnHandler,
123123
const CookieIface* c,
124124
const VBucket& vb);
125125

126-
void aggregateQueueStats(ConnCounter& aggregator,
127-
bool accurateItemsRemaining) const override;
126+
void aggregateQueueStats(ConnCounter& aggregator) const override;
128127

129128
/**
130129
* ALERT: Do NOT call this function while holding ConnMap::connLock.
@@ -481,7 +480,7 @@ class DcpProducer : public ConnHandler,
481480
size_t backfillItemsMemory{};
482481
};
483482

484-
StreamAggStats getStreamAggStats(bool accurateItemsRemaining) const;
483+
StreamAggStats getStreamAggStats() const;
485484

486485
/**
487486
* Map the cb::mcbp::DcpStreamEndStatus to one the client can understand.

engines/ep/src/ep_engine.cc

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,7 @@ cb::engine_errc EventuallyPersistentEngine::get_prometheus_stats(
389389
const std::shared_ptr<ConnHandler>& tc) {
390390
++aggregator.totalConns;
391391
if (auto tp = std::dynamic_pointer_cast<DcpProducer>(tc); tp) {
392-
// Do not use the potentially slower "accurate" items
393-
// remaining. MB-57400
394-
tp->aggregateQueueStats(aggregator, false);
392+
tp->aggregateQueueStats(aggregator);
395393
}
396394
});
397395
addAggregatedProducerStats(collector, aggregator);
@@ -409,7 +407,7 @@ cb::engine_errc EventuallyPersistentEngine::get_prometheus_stats(
409407

410408
// do dcp aggregated stats, using ":" as the separator to split
411409
// connection names to find the connection type.
412-
if (status = doConnAggStatsInner(collector, ":", false);
410+
if (status = doConnAggStatsInner(collector, ":");
413411
status != cb::engine_errc::success) {
414412
return status;
415413
}
@@ -4043,7 +4041,7 @@ struct ConnStatBuilder {
40434041
tc->addStats(add_stat, cookie);
40444042
auto tp = std::dynamic_pointer_cast<DcpProducer>(tc);
40454043
if (tp) {
4046-
tp->aggregateQueueStats(aggregator, true);
4044+
tp->aggregateQueueStats(aggregator);
40474045
}
40484046
}
40494047
}
@@ -4060,17 +4058,11 @@ struct ConnStatBuilder {
40604058

40614059
struct ConnAggStatBuilder {
40624060
/**
4063-
* Construct with the separator and a configuration bool.
4061+
* Construct with the separator.
40644062
* @param sep The separator used for determining "type" of DCP connection
40654063
* by splitting the connection name with sep.
4066-
* @param alwaysUseAccurateItemsRemaining if true the aggregated stats will
4067-
* include an "accurate" items-remaining. If false only connections
4068-
* labelled as "replication" will use an accurate value. See MB-57400
40694064
*/
4070-
ConnAggStatBuilder(std::string_view sep,
4071-
bool alwaysUseAccurateItemsRemaining)
4072-
: sep(sep),
4073-
alwaysUseAccurateItemsRemaining(alwaysUseAccurateItemsRemaining) {
4065+
ConnAggStatBuilder(std::string_view sep) : sep(sep) {
40744066
}
40754067

40764068
/**
@@ -4088,23 +4080,22 @@ struct ConnAggStatBuilder {
40884080
* returns nullptr.
40894081
*
40904082
* @param tc connection
4091-
* @return pair of counter for the given connection, or nullptr and true
4092-
* if the connection is considered a replication stream
4083+
* @return counter for the given connection, or nullptr
40934084
*/
4094-
std::pair<ConnCounter*, bool> getCounterForConnType(std::string_view name) {
4085+
ConnCounter* getCounterForConnType(std::string_view name) {
40954086
// strip everything upto and including the first colon,
40964087
// e.g., "eq_dcpq:"
40974088
size_t pos1 = name.find(':');
40984089
if (pos1 == std::string_view::npos) {
4099-
return {nullptr, false};
4090+
return nullptr;
41004091
}
41014092

41024093
name.remove_prefix(pos1 + 1);
41034094

41044095
// find the given separator
41054096
size_t pos2 = name.find(sep);
41064097
if (pos2 == std::string_view::npos) {
4107-
return {nullptr, false};
4098+
return nullptr;
41084099
}
41094100

41104101
// extract upto given separator e.g.,
@@ -4115,18 +4106,14 @@ struct ConnAggStatBuilder {
41154106
// prefix is "replication"
41164107
std::string prefix(name.substr(0, pos2));
41174108

4118-
// class is created with the itemsRemaining "policy" - which if not true
4119-
// replication will always use the accurate items remaining - all
4120-
// other connection types use a faster estimate.
4121-
return {&counters[prefix],
4122-
alwaysUseAccurateItemsRemaining || prefix == "replication"};
4109+
return &counters[prefix];
41234110
}
41244111

4125-
void aggregate(ConnHandler& conn, ConnCounter* tc, bool isReplication) {
4112+
void aggregate(ConnHandler& conn, ConnCounter* tc) {
41264113
ConnCounter counter;
41274114
++counter.totalConns;
41284115

4129-
conn.aggregateQueueStats(counter, isReplication);
4116+
conn.aggregateQueueStats(counter);
41304117

41314118
ConnCounter& total = getTotalCounter();
41324119
total += counter;
@@ -4142,8 +4129,8 @@ struct ConnAggStatBuilder {
41424129

41434130
void operator()(std::shared_ptr<ConnHandler> tc) {
41444131
if (tc) {
4145-
auto aggregator = getCounterForConnType(tc->getName());
4146-
aggregate(*tc, aggregator.first, aggregator.second);
4132+
ConnCounter* aggregator = getCounterForConnType(tc->getName());
4133+
aggregate(*tc, aggregator);
41474134
}
41484135
}
41494136

@@ -4153,7 +4140,6 @@ struct ConnAggStatBuilder {
41534140

41544141
std::map<std::string, ConnCounter> counters;
41554142
std::string_view sep;
4156-
const bool alwaysUseAccurateItemsRemaining{false};
41574143
};
41584144

41594145
/// @endcond
@@ -4250,8 +4236,8 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
42504236
// write them as responses (which would be racy).
42514237
// a later call to maybeWriteResponse will do that.
42524238
CBStatCollector col(deferredAddStat, cookie);
4253-
ep->doConnAggStatsInner(
4254-
col.forBucket(ep->getName()), separator, true);
4239+
ep->doConnAggStatsInner(col.forBucket(ep->getName()),
4240+
separator);
42554241
return cb::engine_errc::success;
42564242
});
42574243
ExecutorPool::get()->schedule(task);
@@ -4263,15 +4249,13 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
42634249
}
42644250

42654251
cb::engine_errc EventuallyPersistentEngine::doConnAggStatsInner(
4266-
const BucketStatCollector& collector,
4267-
std::string_view sep,
4268-
bool cmdStat) {
4252+
const BucketStatCollector& collector, std::string_view sep) {
42694253
// The separator is, in all current usage, ":" so the length will
42704254
// normally be 1
42714255
const size_t max_sep_len(8);
42724256
sep = sep.substr(0, max_sep_len);
42734257

4274-
ConnAggStatBuilder visitor(sep, cmdStat);
4258+
ConnAggStatBuilder visitor(sep);
42754259
dcpConnMap_->each(visitor);
42764260

42774261
for (const auto& [connType, counter] : visitor.getCounters()) {

0 commit comments

Comments
 (0)