Skip to content

Commit 525edae

Browse files
jimwwalkerdaverigby
authored andcommitted
[BP] MB-57400: Use an estimated items_remaining count for Prometheus stats
[[Backport to 7.1.x for maintenance patch]] Add to CheckpointManager an option to get an estimate of the remaining items for a cursor - the estimate should provide near constant performance as it just asks each the checkpoint from curstor to end for an item count (read of a counter). The max checkpoints is 10, so the estimated call will only iterate 10 at a maximum. The estimate will be inaccurate if the cursor is partially through a checkpoint and will over-count for items already processed. For the Prometheus requests use the estimated statistic for. 1) The aggregated DCP connection stats (high-cardinality) 2) The aggregated DCP connection stats by type (low-cardinality) but only when the type is !replication. For cmd_stat, i.e. dcpagg use the accurate mode, so no change from before this commit. (cherry picked from commit c6397f2) Change-Id: I712c2a7566735849130092d0f1db07d975cdb2c0 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/192799 Well-Formed: Restriction Checker Reviewed-by: Paolo Cocchi <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent 2fac253 commit 525edae

File tree

13 files changed

+148
-46
lines changed

13 files changed

+148
-46
lines changed

engines/ep/src/checkpoint_manager.cc

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,18 +1257,21 @@ void CheckpointManager::checkOpenCheckpoint(
12571257
}
12581258
}
12591259

1260-
size_t CheckpointManager::getNumItemsForCursor(
1261-
const CheckpointCursor* cursor) const {
1260+
size_t CheckpointManager::getNumItemsForCursor(const CheckpointCursor* cursor,
1261+
bool accurate) const {
12621262
std::lock_guard<std::mutex> lh(queueLock);
1263-
return getNumItemsForCursor(lh, cursor);
1263+
return getNumItemsForCursor(lh, cursor, accurate);
12641264
}
12651265

12661266
size_t CheckpointManager::getNumItemsForCursor(
12671267
const std::lock_guard<std::mutex>& lh,
1268-
const CheckpointCursor* cursor) const {
1268+
const CheckpointCursor* cursor,
1269+
bool accurate) const {
12691270
if (cursor && cursor->valid()) {
1270-
size_t items = cursor->getRemainingItemsCount();
12711271
CheckpointList::const_iterator chkptIterator(cursor->getCheckpoint());
1272+
size_t items = accurate ? cursor->getRemainingItemsCount()
1273+
: (*chkptIterator)->getNumItems();
1274+
12721275
if (chkptIterator != checkpointList.end()) {
12731276
++chkptIterator;
12741277
}
@@ -1595,7 +1598,7 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
15951598
"vb_%d:num_items_for_persistence",
15961599
vbucketId.get());
15971600
add_casted_stat(buf.data(),
1598-
getNumItemsForCursor(lh, persistenceCursor),
1601+
getNumItemsForCursor(lh, persistenceCursor, true),
15991602
add_stat,
16001603
cookie);
16011604
}
@@ -1635,10 +1638,21 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
16351638
"vb_%d:%s:num_items_for_cursor",
16361639
vbucketId.get(),
16371640
cursor.second->getName().c_str());
1638-
add_casted_stat(buf.data(),
1639-
getNumItemsForCursor(lh, cursor.second.get()),
1640-
add_stat,
1641-
cookie);
1641+
add_casted_stat(
1642+
buf.data(),
1643+
getNumItemsForCursor(lh, cursor.second.get(), true),
1644+
add_stat,
1645+
cookie);
1646+
checked_snprintf(buf.data(),
1647+
buf.size(),
1648+
"vb_%d:%s:estimated_num_items_for_cursor",
1649+
vbucketId.get(),
1650+
cursor.second->getName().c_str());
1651+
add_casted_stat(
1652+
buf.data(),
1653+
getNumItemsForCursor(lh, cursor.second.get(), false),
1654+
add_stat,
1655+
cookie);
16421656
}
16431657
}
16441658

engines/ep/src/checkpoint_manager.h

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,11 +366,19 @@ class CheckpointManager {
366366
* As such it is *not* safe to use when a precise count of remaining
367367
* items is needed.
368368
*
369-
* Returns the count of Items (excluding meta items) that the given cursor
369+
* @param cursor The cursor for which the caller want to know the item count
370+
* @param accurate if true, then the function will perform a count from the
371+
* the cursor to the end of the current checkpoint. If false the
372+
* total item count of the current checkpoint is used in the sum.
373+
* Warning that accurate=true is an O(n) cost, where n is the number
374+
* of items, it can be slow and trigger problems in other threads
375+
* that want to access the CheckpointManager (MB-57000)
376+
* @return the count of Items (excluding meta items) that the given cursor
370377
* has yet to process (i.e. between the cursor's current position and the
371-
* end of the last checkpoint).
378+
* end of the last checkpoint). Note: see param accurate for more detail
372379
*/
373-
size_t getNumItemsForCursor(const CheckpointCursor* cursor) const;
380+
size_t getNumItemsForCursor(const CheckpointCursor* cursor,
381+
bool accurate = true) const;
374382

375383
/* WARNING! This method can return inaccurate counts - see MB-28431. It
376384
* at *least* can suffer from overcounting by at least 1 (in scenarios as
@@ -741,7 +749,8 @@ class CheckpointManager {
741749
* @return number of items to be processed
742750
*/
743751
size_t getNumItemsForCursor(const std::lock_guard<std::mutex>& lh,
744-
const CheckpointCursor* cursor) const;
752+
const CheckpointCursor* cursor,
753+
bool accurate) const;
745754

746755
/**
747756
* Clears this CM, effectively removing all checkpoints in the list and

engines/ep/src/connhandler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ class ConnHandler : public DcpConnHandlerIface {
280280

281281
virtual void addStats(const AddStatFn& add_stat, const CookieIface* c);
282282

283-
virtual void aggregateQueueStats(ConnCounter& stats_aggregator) const {
283+
virtual void aggregateQueueStats(ConnCounter& stats_aggregator,
284+
bool accurateItemsRemaining) const {
284285
// Empty
285286
}
286287

engines/ep/src/dcp/active_stream.cc

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2103,7 +2103,7 @@ void ActiveStream::transitionState(StreamState newState) {
21032103
}
21042104
}
21052105

2106-
size_t ActiveStream::getItemsRemaining() {
2106+
size_t ActiveStream::getItemsRemaining(bool accurateItemsRemaining) {
21072107
VBucketPtr vbucket = engine->getVBucket(vb_);
21082108

21092109
if (!vbucket || !isActive()) {
@@ -2113,9 +2113,20 @@ size_t ActiveStream::getItemsRemaining() {
21132113
// Items remaining is the sum of:
21142114
// (a) Items outstanding in checkpoints
21152115
// (b) Items pending in our readyQ, excluding any meta items.
2116+
//
2117+
// Note: if accurateItemsRemaining=true this call could be slow, the
2118+
// returned value will count from the cursor to the end of the checkpoint.
2119+
// That's an O(n) n=number-items.
2120+
// if accurateItemsRemaining=false the returned value represents the sum of
2121+
// items for current and all outstanding checkpoints, so is inaccurate in
2122+
// worst case by chk_max_items - 1 (if the cursor happens to be at the end
2123+
// of the checkpoint). The cost of the estimated call is O(n) where is
2124+
// number of checkpoints (and that is capped by max_checkpoints, 10 as
2125+
// default).
21162126
size_t ckptItems = 0;
21172127
if (auto sp = cursor.lock()) {
2118-
ckptItems = vbucket->checkpointManager->getNumItemsForCursor(sp.get());
2128+
ckptItems = vbucket->checkpointManager->getNumItemsForCursor(
2129+
sp.get(), accurateItemsRemaining);
21192130
}
21202131
return ckptItems + readyQ_non_meta_items;
21212132
}

engines/ep/src/dcp/active_stream.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ class ActiveStream : public Stream,
273273
/* Returns a count of how many items are outstanding to be sent for this
274274
* stream's vBucket.
275275
*/
276-
size_t getItemsRemaining();
276+
size_t getItemsRemaining(bool accurateItemsRemaining = true);
277277

278278
uint64_t getLastReadSeqno() const;
279279

engines/ep/src/dcp/consumer.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1204,7 +1204,8 @@ void DcpConsumer::addStats(const AddStatFn& add_stat, const CookieIface* c) {
12041204
addStat("synchronous_replication", isSyncReplicationEnabled(), add_stat, c);
12051205
}
12061206

1207-
void DcpConsumer::aggregateQueueStats(ConnCounter& aggregator) const {
1207+
void DcpConsumer::aggregateQueueStats(ConnCounter& aggregator,
1208+
bool accurateItemsRemaining) const {
12081209
aggregator.conn_queueBackoff += backoffs;
12091210
}
12101211

engines/ep/src/dcp/consumer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,8 @@ class DcpConsumer : public ConnHandler,
258258

259259
void addStats(const AddStatFn& add_stat, const CookieIface* c) override;
260260

261-
void aggregateQueueStats(ConnCounter& aggregator) const override;
261+
void aggregateQueueStats(ConnCounter& aggregator,
262+
bool accurateItemsRemaining) const override;
262263

263264
void notifyStreamReady(Vbid vbucket);
264265

engines/ep/src/dcp/producer.cc

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,12 +1560,13 @@ void DcpProducer::addTakeoverStats(const AddStatFn& add_stat,
15601560
add_casted_stat("backfillRemaining", 0, add_stat, c);
15611561
}
15621562

1563-
void DcpProducer::aggregateQueueStats(ConnCounter& aggregator) const {
1563+
void DcpProducer::aggregateQueueStats(ConnCounter& aggregator,
1564+
bool accurateItemsRemaining) const {
15641565
++aggregator.totalProducers;
15651566
aggregator.conn_queueDrain += itemsSent;
15661567
aggregator.conn_totalBytes += totalBytesSent;
15671568
aggregator.conn_totalUncompressedDataSize += totalUncompressedDataSize;
1568-
auto streamAggStats = getStreamAggStats();
1569+
auto streamAggStats = getStreamAggStats(accurateItemsRemaining);
15691570

15701571
aggregator.conn_queueRemaining += streamAggStats.itemsRemaining;
15711572
aggregator.conn_queueMemory += streamAggStats.readyQueueMemory;
@@ -1908,17 +1909,19 @@ size_t DcpProducer::getItemsRemaining() const {
19081909
return remainingSize;
19091910
}
19101911

1911-
DcpProducer::StreamAggStats DcpProducer::getStreamAggStats() const {
1912+
DcpProducer::StreamAggStats DcpProducer::getStreamAggStats(
1913+
bool accurateItemsRemaining) const {
19121914
DcpProducer::StreamAggStats stats{};
19131915

19141916
std::for_each(
19151917
streams->begin(),
19161918
streams->end(),
1917-
[&stats](const StreamsMap::value_type& vt) {
1919+
[&stats, accurateItemsRemaining](const StreamsMap::value_type& vt) {
19181920
for (auto itr = vt.second->rlock(); !itr.end(); itr.next()) {
19191921
auto* as = itr.get().get();
19201922
if (as) {
1921-
stats.itemsRemaining += as->getItemsRemaining();
1923+
stats.itemsRemaining +=
1924+
as->getItemsRemaining(accurateItemsRemaining);
19221925
stats.readyQueueMemory += as->getReadyQueueMemory();
19231926
}
19241927
}

engines/ep/src/dcp/producer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@ class DcpProducer : public ConnHandler,
121121
const CookieIface* c,
122122
const VBucket& vb);
123123

124-
void aggregateQueueStats(ConnCounter& aggregator) const override;
124+
void aggregateQueueStats(ConnCounter& aggregator,
125+
bool accurateItemsRemaining) const override;
125126

126127
/**
127128
* ALERT: Do NOT call this function while holding ConnMap::connLock.
@@ -452,7 +453,7 @@ class DcpProducer : public ConnHandler,
452453
size_t readyQueueMemory;
453454
};
454455

455-
StreamAggStats getStreamAggStats() const;
456+
StreamAggStats getStreamAggStats(bool accurateItemsRemaining) const;
456457

457458
/**
458459
* Map the cb::mcbp::DcpStreamEndStatus to one the client can understand.

engines/ep/src/ep_engine.cc

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,9 @@ cb::engine_errc EventuallyPersistentEngine::get_prometheus_stats(
389389
const std::shared_ptr<ConnHandler>& tc) {
390390
++aggregator.totalConns;
391391
if (auto tp = std::dynamic_pointer_cast<DcpProducer>(tc); tp) {
392-
tp->aggregateQueueStats(aggregator);
392+
// Do not use the potentially slower "accurate" items
393+
// remaining. MB-57400
394+
tp->aggregateQueueStats(aggregator, false);
393395
}
394396
});
395397
addAggregatedProducerStats(collector, aggregator);
@@ -402,7 +404,7 @@ cb::engine_errc EventuallyPersistentEngine::get_prometheus_stats(
402404
}
403405
// do dcp aggregated stats, using ":" as the separator to split
404406
// connection names to find the connection type.
405-
if (status = doConnAggStatsInner(collector, ":");
407+
if (status = doConnAggStatsInner(collector, ":", false);
406408
status != cb::engine_errc::success) {
407409
return status;
408410
}
@@ -3916,7 +3918,7 @@ struct ConnStatBuilder {
39163918
tc->addStats(add_stat, cookie);
39173919
auto tp = std::dynamic_pointer_cast<DcpProducer>(tc);
39183920
if (tp) {
3919-
tp->aggregateQueueStats(aggregator);
3921+
tp->aggregateQueueStats(aggregator, true);
39203922
}
39213923
}
39223924
}
@@ -3932,7 +3934,18 @@ struct ConnStatBuilder {
39323934
};
39333935

39343936
struct ConnAggStatBuilder {
3935-
ConnAggStatBuilder(std::string_view sep) : sep(sep) {
3937+
/**
3938+
* Construct with the separator and a configuration bool.
3939+
* @param sep The separator used for determining "type" of DCP connection
3940+
* by splitting the connection name with sep.
3941+
* @param alwaysUseAccurateItemsRemaining if true the aggregated stats will
3942+
* include an "accurate" items-remaining. If false only connections
3943+
* labelled as "replication" will use an accurate value. See MB-57400
3944+
*/
3945+
ConnAggStatBuilder(std::string_view sep,
3946+
bool alwaysUseAccurateItemsRemaining)
3947+
: sep(sep),
3948+
alwaysUseAccurateItemsRemaining(alwaysUseAccurateItemsRemaining) {
39363949
}
39373950

39383951
/**
@@ -3950,22 +3963,23 @@ struct ConnAggStatBuilder {
39503963
* returns nullptr.
39513964
*
39523965
* @param tc connection
3953-
* @return counter for the given connection, or nullptr
3966+
* @return pair of counter for the given connection, or nullptr and true
3967+
* if the connection is considered a replication stream
39543968
*/
3955-
ConnCounter* getCounterForConnType(std::string_view name) {
3969+
std::pair<ConnCounter*, bool> getCounterForConnType(std::string_view name) {
39563970
// strip everything upto and including the first colon,
39573971
// e.g., "eq_dcpq:"
39583972
size_t pos1 = name.find(':');
39593973
if (pos1 == std::string_view::npos) {
3960-
return nullptr;
3974+
return {nullptr, false};
39613975
}
39623976

39633977
name.remove_prefix(pos1 + 1);
39643978

39653979
// find the given separator
39663980
size_t pos2 = name.find(sep);
39673981
if (pos2 == std::string_view::npos) {
3968-
return nullptr;
3982+
return {nullptr, false};
39693983
}
39703984

39713985
// extract upto given separator e.g.,
@@ -3976,14 +3990,18 @@ struct ConnAggStatBuilder {
39763990
// prefix is "replication"
39773991
std::string prefix(name.substr(0, pos2));
39783992

3979-
return &counters[prefix];
3993+
// class is created with the itemsRemaining "policy" - which if not true
3994+
// replication will always use the accurate items remaining - all
3995+
// other connection types use a faster estimate.
3996+
return {&counters[prefix],
3997+
alwaysUseAccurateItemsRemaining || prefix == "replication"};
39803998
}
39813999

3982-
void aggregate(ConnHandler& conn, ConnCounter* tc) {
4000+
void aggregate(ConnHandler& conn, ConnCounter* tc, bool isReplication) {
39834001
ConnCounter counter;
39844002
++counter.totalConns;
39854003

3986-
conn.aggregateQueueStats(counter);
4004+
conn.aggregateQueueStats(counter, isReplication);
39874005

39884006
ConnCounter& total = getTotalCounter();
39894007
total += counter;
@@ -3999,8 +4017,8 @@ struct ConnAggStatBuilder {
39994017

40004018
void operator()(std::shared_ptr<ConnHandler> tc) {
40014019
if (tc) {
4002-
ConnCounter* aggregator = getCounterForConnType(tc->getName());
4003-
aggregate(*tc, aggregator);
4020+
auto aggregator = getCounterForConnType(tc->getName());
4021+
aggregate(*tc, aggregator.first, aggregator.second);
40044022
}
40054023
}
40064024

@@ -4010,6 +4028,7 @@ struct ConnAggStatBuilder {
40104028

40114029
std::map<std::string, ConnCounter> counters;
40124030
std::string_view sep;
4031+
const bool alwaysUseAccurateItemsRemaining{false};
40134032
};
40144033

40154034
/// @endcond
@@ -4102,8 +4121,8 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
41024121
// write them as responses (which would be racy).
41034122
// a later call to maybeWriteResponse will do that.
41044123
CBStatCollector col(deferredAddStat, cookie);
4105-
ep->doConnAggStatsInner(col.forBucket(ep->getName()),
4106-
separator);
4124+
ep->doConnAggStatsInner(
4125+
col.forBucket(ep->getName()), separator, true);
41074126
return cb::engine_errc::success;
41084127
});
41094128
ExecutorPool::get()->schedule(task);
@@ -4115,13 +4134,15 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
41154134
}
41164135

41174136
cb::engine_errc EventuallyPersistentEngine::doConnAggStatsInner(
4118-
const BucketStatCollector& collector, std::string_view sep) {
4137+
const BucketStatCollector& collector,
4138+
std::string_view sep,
4139+
bool cmdStat) {
41194140
// The separator is, in all current usage, ":" so the length will
41204141
// normally be 1
41214142
const size_t max_sep_len(8);
41224143
sep = sep.substr(0, max_sep_len);
41234144

4124-
ConnAggStatBuilder visitor(sep);
4145+
ConnAggStatBuilder visitor(sep, cmdStat);
41254146
dcpConnMap_->each(visitor);
41264147

41274148
for (const auto& [connType, counter] : visitor.getCounters()) {

0 commit comments

Comments
 (0)