Skip to content

Commit 624fa3b

Browse files
authored
server: metrics to track stored command sizes (#5764)
* server: Track size of stored commands in exec info body The stored command size is tracked in a field which will be used to update metrics. A new field is introduced instead of just calling UsedMemory on exec info so we do not have to iterate over the entire stored commands vector each time we read the used memory. Signed-off-by: Abhijat Malviya <[email protected]> * server: Add metric for stored commands used memory Signed-off-by: Abhijat Malviya <[email protected]>
1 parent 28f7304 commit 624fa3b

File tree

7 files changed

+62
-4
lines changed

7 files changed

+62
-4
lines changed

src/server/conn_context.cc

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,18 @@ size_t ConnectionState::ExecInfo::UsedMemory() const {
229229
return dfly::HeapSize(body) + dfly::HeapSize(watched_keys);
230230
}
231231

232+
void ConnectionState::ExecInfo::AddStoredCmd(const CommandId* cid, bool own_args, CmdArgList args) {
233+
body.emplace_back(cid, own_args, args);
234+
stored_cmd_bytes += body.back().UsedMemory();
235+
}
236+
237+
size_t ConnectionState::ExecInfo::ClearStoredCmds() {
238+
const size_t used = GetStoredCmdBytes();
239+
vector<StoredCmd>{}.swap(body);
240+
stored_cmd_bytes = 0;
241+
return used;
242+
}
243+
232244
size_t ConnectionState::ScriptInfo::UsedMemory() const {
233245
return dfly::HeapSize(lock_tags) + async_cmds_heap_mem;
234246
}
@@ -307,7 +319,8 @@ vector<unsigned> ConnectionContext::ChangeSubscriptions(CmdArgList channels, boo
307319
void ConnectionState::ExecInfo::Clear() {
308320
DCHECK(!preborrowed_interpreter); // Must have been released properly
309321
state = EXEC_INACTIVE;
310-
vector<StoredCmd>{}.swap(body);
322+
const size_t cleared_size = ClearStoredCmds();
323+
ServerState::tlocal()->stats.stored_cmd_bytes -= cleared_size;
311324
is_write = false;
312325
ClearWatched();
313326
}

src/server/conn_context.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,18 @@ struct ConnectionState {
8686

8787
size_t UsedMemory() const;
8888

89+
// Adds a StoredCmd and updates the stored_cmds_size
90+
void AddStoredCmd(const CommandId* cid, bool own_args, CmdArgList args);
91+
92+
// Empties the body vector and resets stored_cmds_size to 0. Returns the size before data was
93+
// cleared.
94+
size_t ClearStoredCmds();
95+
96+
// Returns memory used by the body field without iterating over each stored command
97+
size_t GetStoredCmdBytes() const {
98+
return stored_cmd_bytes + body.capacity() * sizeof(StoredCmd);
99+
}
100+
89101
ExecState state = EXEC_INACTIVE;
90102
std::vector<StoredCmd> body;
91103
bool is_write = false;
@@ -99,6 +111,10 @@ struct ConnectionState {
99111
// executing the multi transaction, which can create deadlocks by blocking other transactions
100112
// that already borrowed all available interpreters but wait for keys to be unlocked.
101113
Interpreter* preborrowed_interpreter = nullptr;
114+
115+
// The total size of all stored commands kept in "body". Does not include memory allocated by
116+
// the "body" vector.
117+
size_t stored_cmd_bytes = 0;
102118
};
103119

104120
// Lua-script related data.

src/server/main_service.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1346,9 +1346,12 @@ DispatchResult Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder
13461346
bool is_trans_cmd = CO::IsTransKind(cid->name());
13471347
if (dfly_cntx->conn_state.exec_info.IsCollecting() && !is_trans_cmd) {
13481348
// TODO: protect against aggregating huge transactions.
1349-
dfly_cntx->conn_state.exec_info.body.emplace_back(cid, true, args_no_cmd);
1349+
auto& exec_info = dfly_cntx->conn_state.exec_info;
1350+
const size_t old_size = exec_info.GetStoredCmdBytes();
1351+
exec_info.AddStoredCmd(cid, true, args_no_cmd);
1352+
etl.stats.stored_cmd_bytes += exec_info.GetStoredCmdBytes() - old_size;
13501353
if (cid->IsWriteOnly()) {
1351-
dfly_cntx->conn_state.exec_info.is_write = true;
1354+
exec_info.is_write = true;
13521355
}
13531356
builder->SendSimpleString("QUEUED");
13541357
return DispatchResult::OK;

src/server/multi_test.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,4 +1223,22 @@ TEST_F(MultiTest, ForceAtomicityFlag) {
12231223
EXPECT_THAT(Run({"eval", kScript, "0"}), ErrArg("undeclared"));
12241224
}
12251225

1226+
TEST_F(MultiTest, StoredCmdBytesMetric) {
1227+
ASSERT_EQ(GetMetrics().coordinator_stats.stored_cmd_bytes, 0);
1228+
1229+
RespExpr resp = Run({"multi"});
1230+
ASSERT_EQ(resp, "OK");
1231+
1232+
for (auto i = 0; i < 100; ++i) {
1233+
ASSERT_EQ(Run({"get", kKey1}), "QUEUED");
1234+
}
1235+
1236+
ASSERT_GT(GetMetrics().coordinator_stats.stored_cmd_bytes, 0);
1237+
1238+
resp = Run({"exec"});
1239+
ASSERT_THAT(resp, ArrLen(100));
1240+
ASSERT_THAT(resp.GetVec(), Contains(ArgType(RespExpr::NIL)).Times(100));
1241+
ASSERT_EQ(GetMetrics().coordinator_stats.stored_cmd_bytes, 0);
1242+
}
1243+
12261244
} // namespace dfly

src/server/server_family.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1778,6 +1778,9 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
17781778
AppendMetricValue("memory_by_class_bytes", total.obj_memory_usage, {"class"}, {"object_used"},
17791779
&memory_by_class_bytes);
17801780

1781+
AppendMetricValue("memory_by_class_bytes", m.coordinator_stats.stored_cmd_bytes, {"class"},
1782+
{"conn_stored_commands"}, &memory_by_class_bytes);
1783+
17811784
// Command stats
17821785
if (!m.cmd_stats_map.empty()) {
17831786
string command_metrics;

src/server/server_state.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ ServerState::Stats::Stats(unsigned num_shards)
5353
}
5454

5555
ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
56-
static_assert(sizeof(Stats) == 24 * 8, "Stats size mismatch");
56+
static_assert(sizeof(Stats) == 25 * 8, "Stats size mismatch");
5757

5858
#define ADD(x) this->x += (other.x)
5959

@@ -95,6 +95,8 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
9595
} else {
9696
this->squash_width_freq_arr = other.squash_width_freq_arr;
9797
}
98+
99+
ADD(stored_cmd_bytes);
98100
return *this;
99101
#undef ADD
100102
}

src/server/server_state.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ class ServerState { // public struct - to allow initialization.
136136
uint32_t conn_timeout_events = 0;
137137
uint64_t psync_requests_total = 0;
138138
std::valarray<uint64_t> tx_width_freq_arr, squash_width_freq_arr;
139+
140+
// Memory size of stored commands during multi-exec in connections
141+
size_t stored_cmd_bytes = 0;
139142
};
140143

141144
// Unsafe version.

0 commit comments

Comments
 (0)