Skip to content

Commit ac33cd8

Browse files
authored
feat(metrics): Add label for main and other listeners (#4739)
* feat(metrics): Add label for main and other listeners The stats collected per connection are divided according to main or other listener. Metrics are decorated with labels listener= main or other. The memcached listener is also labelled as main. Signed-off-by: Abhijat Malviya <[email protected]>
1 parent 0e35f78 commit ac33cd8

File tree

11 files changed

+143
-24
lines changed

11 files changed

+143
-24
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -698,14 +698,18 @@ void Connection::OnPostMigrateThread() {
698698
}
699699

700700
stats_ = &tl_facade_stats->conn_stats;
701-
++stats_->num_conns;
701+
IncrNumConns();
702702
stats_->read_buf_capacity += io_buf_.Capacity();
703703
}
704704

705705
void Connection::OnConnectionStart() {
706706
ThisFiber::SetName("DflyConnection");
707707

708708
stats_ = &tl_facade_stats->conn_stats;
709+
710+
if (const Listener* lsnr = static_cast<Listener*>(listener()); lsnr) {
711+
is_main_ = lsnr->IsMainInterface();
712+
}
709713
}
710714

711715
void Connection::HandleRequests() {
@@ -916,7 +920,16 @@ bool Connection::IsPrivileged() const {
916920
}
917921

918922
bool Connection::IsMain() const {
919-
return static_cast<Listener*>(listener())->IsMainInterface();
923+
return is_main_;
924+
}
925+
926+
bool Connection::IsMainOrMemcache() const {
927+
if (is_main_) {
928+
return true;
929+
}
930+
931+
const Listener* lsnr = static_cast<Listener*>(listener());
932+
return lsnr && lsnr->protocol() == Protocol::MEMCACHE;
920933
}
921934

922935
void Connection::SetName(string name) {
@@ -990,7 +1003,7 @@ void Connection::ConnectionFlow() {
9901003

9911004
ConfigureProvidedBuffer();
9921005

993-
++stats_->num_conns;
1006+
IncrNumConns();
9941007
++stats_->conn_received_cnt;
9951008
stats_->read_buf_capacity += io_buf_.Capacity();
9961009

@@ -1920,8 +1933,7 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
19201933

19211934
void Connection::DecreaseStatsOnClose() {
19221935
stats_->read_buf_capacity -= io_buf_.Capacity();
1923-
1924-
--stats_->num_conns;
1936+
DecrNumConns();
19251937
}
19261938

19271939
void Connection::BreakOnce(uint32_t ev_mask) {
@@ -1993,6 +2005,20 @@ void Connection::MarkReadBufferConsumed() {
19932005
}
19942006
}
19952007

2008+
void Connection::IncrNumConns() {
2009+
if (IsMainOrMemcache())
2010+
++stats_->num_conns_main;
2011+
else
2012+
++stats_->num_conns_other;
2013+
}
2014+
2015+
void Connection::DecrNumConns() {
2016+
if (IsMainOrMemcache())
2017+
--stats_->num_conns_main;
2018+
else
2019+
--stats_->num_conns_other;
2020+
}
2021+
19962022
void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) {
19972023
thread_queue_backpressure[tid].pipeline_queue_max_len = val;
19982024
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
@@ -2059,11 +2085,11 @@ bool Connection::WeakRef::operator==(const WeakRef& other) const {
20592085

20602086
void ResetStats() {
20612087
auto& cstats = tl_facade_stats->conn_stats;
2062-
cstats.command_cnt = 0;
20632088
cstats.pipelined_cmd_cnt = 0;
20642089
cstats.conn_received_cnt = 0;
20652090
cstats.pipelined_cmd_cnt = 0;
2066-
cstats.command_cnt = 0;
2091+
cstats.command_cnt_main = 0;
2092+
cstats.command_cnt_other = 0;
20672093
cstats.io_read_cnt = 0;
20682094
cstats.io_read_bytes = 0;
20692095

src/facade/dragonfly_connection.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@ class Connection : public util::Connection {
261261

262262
bool IsMain() const;
263263

264+
// In addition to the listener role being main, also returns true if the protocol is Memcached.
265+
// This method returns true for customer facing listeners.
266+
bool IsMainOrMemcache() const;
267+
264268
void SetName(std::string name);
265269

266270
void SetLibName(std::string name);
@@ -409,6 +413,9 @@ class Connection : public util::Connection {
409413
io::Bytes NextBundleBuffer(size_t total_len);
410414
void MarkReadBufferConsumed();
411415

416+
void IncrNumConns();
417+
void DecrNumConns();
418+
412419
std::deque<MessageHandle> dispatch_q_; // dispatch queue
413420
util::fb2::CondVarAny cnd_; // dispatch queue waker
414421
util::fb2::Fiber async_fb_; // async fiber (if started)
@@ -474,6 +481,7 @@ class Connection : public util::Connection {
474481
bool is_http_ : 1;
475482
bool is_tls_ : 1;
476483
bool recv_provided_ : 1;
484+
bool is_main_ : 1;
477485
};
478486
};
479487
};

src/facade/facade.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
2020

2121
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
2222
// To break this code deliberately if we add/remove a field to this struct.
23-
static_assert(kSizeConnStats == 120u);
23+
static_assert(kSizeConnStats == 136u);
2424

2525
ADD(read_buf_capacity);
2626
ADD(dispatch_queue_entries);
@@ -29,11 +29,13 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
2929
ADD(pipeline_cmd_cache_bytes);
3030
ADD(io_read_cnt);
3131
ADD(io_read_bytes);
32-
ADD(command_cnt);
32+
ADD(command_cnt_main);
33+
ADD(command_cnt_other);
3334
ADD(pipelined_cmd_cnt);
3435
ADD(pipelined_cmd_latency);
3536
ADD(conn_received_cnt);
36-
ADD(num_conns);
37+
ADD(num_conns_main);
38+
ADD(num_conns_other);
3739
ADD(num_blocked_clients);
3840
ADD(num_migrations);
3941
ADD(num_recv_provided_calls);

src/facade/facade_types.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,14 @@ struct ConnectionStats {
9999
uint64_t io_read_cnt = 0;
100100
size_t io_read_bytes = 0;
101101

102-
uint64_t command_cnt = 0;
102+
uint64_t command_cnt_main = 0;
103+
uint64_t command_cnt_other = 0;
103104
uint64_t pipelined_cmd_cnt = 0;
104105
uint64_t pipelined_cmd_latency = 0; // in microseconds
105106
uint64_t conn_received_cnt = 0;
106107

107-
uint32_t num_conns = 0;
108+
uint32_t num_conns_main = 0;
109+
uint32_t num_conns_other = 0;
108110
uint32_t num_blocked_clients = 0;
109111
uint64_t num_migrations = 0;
110112
uint64_t num_recv_provided_calls = 0;

src/server/conn_context.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ ConnectionContext::ConnectionContext(facade::Connection* owner, acl::UserCredent
104104
: facade::ConnectionContext(owner) {
105105
if (owner) {
106106
skip_acl_validation = owner->IsPrivileged();
107+
has_main_or_memcache_listener = owner->IsMainOrMemcache();
107108
}
108109

109110
keys = std::move(cred.keys);
@@ -125,6 +126,9 @@ ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction
125126
skip_acl_validation = owner->skip_acl_validation;
126127
acl_db_idx = owner->acl_db_idx;
127128
ns = owner->ns;
129+
if (owner->conn()) {
130+
has_main_or_memcache_listener = owner->conn()->IsMainOrMemcache();
131+
}
128132
} else {
129133
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::NONE_COMMANDS);
130134
}

src/server/conn_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@ class ConnectionContext : public facade::ConnectionContext {
317317
// Reference to a FlowInfo for this connection if from a master to a replica.
318318
FlowInfo* replication_flow = nullptr;
319319

320+
// The related connection is bound to main listener or serves the memcached protocol
321+
bool has_main_or_memcache_listener = false;
322+
320323
private:
321324
void EnableMonitoring(bool enable) {
322325
subscriptions++; // required to support the monitoring

src/server/dragonfly_test.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,4 +834,16 @@ TEST_F(DflyEngineTest, ReplicaofRejectOnLoad) {
834834
// To consider having a parameter in dragonfly engine controlling number of shards
835835
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
836836

837+
TEST_F(DflyEngineTest, CommandMetricLabels) {
838+
EXPECT_EQ(Run({"SET", "foo", "bar"}), "OK");
839+
EXPECT_EQ(Run({"GET", "foo"}), "bar");
840+
const Metrics metrics = GetMetrics();
841+
842+
// The test connection counts as other
843+
EXPECT_EQ(metrics.facade_stats.conn_stats.command_cnt_other, 2);
844+
EXPECT_EQ(metrics.facade_stats.conn_stats.command_cnt_main, 0);
845+
EXPECT_EQ(metrics.facade_stats.conn_stats.num_conns_main, 0);
846+
EXPECT_EQ(metrics.facade_stats.conn_stats.num_conns_other, 0);
847+
}
848+
837849
} // namespace dfly

src/server/main_service.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1318,7 +1318,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
13181318
DispatchMonitor(cntx, cid, tail_args);
13191319
}
13201320

1321-
ServerState::tlocal()->RecordCmd();
1321+
ServerState::tlocal()->RecordCmd(cntx->has_main_or_memcache_listener);
13221322
Transaction* tx = cntx->transaction;
13231323
auto& info = cntx->conn_state.tracking_info_;
13241324
const bool is_read_only = cid->opt_mask() & CO::READONLY;

src/server/server_family.cc

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,8 +1283,11 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
12831283
const auto& conn_stats = m.facade_stats.conn_stats;
12841284
AppendMetricWithoutLabels("max_clients", "Maximal number of clients", GetFlag(FLAGS_maxclients),
12851285
MetricType::GAUGE, &resp->body());
1286-
AppendMetricWithoutLabels("connected_clients", "", conn_stats.num_conns, MetricType::GAUGE,
1287-
&resp->body());
1286+
AppendMetricHeader("connected_clients", "", MetricType::GAUGE, &resp->body());
1287+
AppendMetricValue("connected_clients", conn_stats.num_conns_main, {"listener"}, {"main"},
1288+
&resp->body());
1289+
AppendMetricValue("connected_clients", conn_stats.num_conns_other, {"listener"}, {"other"},
1290+
&resp->body());
12881291
AppendMetricWithoutLabels("client_read_buffer_bytes", "", conn_stats.read_buf_capacity,
12891292
MetricType::GAUGE, &resp->body());
12901293
AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients,
@@ -1381,8 +1384,11 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
13811384
AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt,
13821385
MetricType::COUNTER, &resp->body());
13831386

1384-
AppendMetricWithoutLabels("commands_processed_total", "", conn_stats.command_cnt,
1385-
MetricType::COUNTER, &resp->body());
1387+
AppendMetricHeader("commands_processed_total", "", MetricType::COUNTER, &resp->body());
1388+
AppendMetricValue("commands_processed_total", conn_stats.command_cnt_main, {"listener"}, {"main"},
1389+
&resp->body());
1390+
AppendMetricValue("commands_processed_total", conn_stats.command_cnt_other, {"listener"},
1391+
{"other"}, &resp->body());
13861392
AppendMetricWithoutLabels("keyspace_hits_total", "", m.events.hits, MetricType::COUNTER,
13871393
&resp->body());
13881394
AppendMetricWithoutLabels("keyspace_misses_total", "", m.events.misses, MetricType::COUNTER,
@@ -1628,6 +1634,8 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder)
16281634
Metrics m = GetMetrics(&namespaces->GetDefaultNamespace());
16291635
uint64_t uptime = time(NULL) - start_time_;
16301636

1637+
const uint32_t total_conns =
1638+
m.facade_stats.conn_stats.num_conns_main + m.facade_stats.conn_stats.num_conns_other;
16311639
ADD_LINE(pid, getpid());
16321640
ADD_LINE(uptime, uptime);
16331641
ADD_LINE(time, now);
@@ -1637,7 +1645,7 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder)
16371645
ADD_LINE(rusage_user, utime);
16381646
ADD_LINE(rusage_system, systime);
16391647
ADD_LINE(max_connections, -1);
1640-
ADD_LINE(curr_connections, m.facade_stats.conn_stats.num_conns);
1648+
ADD_LINE(curr_connections, total_conns);
16411649
ADD_LINE(total_connections, -1);
16421650
ADD_LINE(rejected_connections, -1);
16431651
ADD_LINE(bytes_read, m.facade_stats.conn_stats.io_read_bytes);
@@ -2357,7 +2365,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
23572365
};
23582366

23592367
auto add_clients_info = [&] {
2360-
append("connected_clients", m.facade_stats.conn_stats.num_conns);
2368+
append("connected_clients",
2369+
m.facade_stats.conn_stats.num_conns_main + m.facade_stats.conn_stats.num_conns_other);
23612370
append("max_clients", GetFlag(FLAGS_maxclients));
23622371
append("client_read_buffer_bytes", m.facade_stats.conn_stats.read_buf_capacity);
23632372
append("blocked_clients", m.facade_stats.conn_stats.num_blocked_clients);
@@ -2445,7 +2454,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
24452454
auto& reply_stats = m.facade_stats.reply_stats;
24462455

24472456
append("total_connections_received", conn_stats.conn_received_cnt);
2448-
append("total_commands_processed", conn_stats.command_cnt);
2457+
append("total_commands_processed", conn_stats.command_cnt_main + conn_stats.command_cnt_other);
24492458
append("instantaneous_ops_per_sec", m.qps);
24502459
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
24512460
append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands);

src/server/server_state.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,12 @@ class ServerState { // public struct - to allow initialization.
210210
return qps_.SumTail();
211211
}
212212

213-
void RecordCmd() {
214-
++tl_connection_stats()->command_cnt;
213+
void RecordCmd(const bool is_main_conn) {
214+
if (is_main_conn) {
215+
++tl_connection_stats()->command_cnt_main;
216+
} else {
217+
++tl_connection_stats()->command_cnt_other;
218+
}
215219
qps_.Inc();
216220
}
217221

0 commit comments

Comments
 (0)