Skip to content

Commit 3f9b3b6

Browse files
committed
Expose AMQP connection metrics
Expose the same metrics for AMQP 1.0 connections as for AMQP 0.9.1 connections. Display the following AMQP 1.0 metrics on the Management UI: * Network bytes per second from/to client on connections page * Number of sessions/channels on connections page * Network bytes per second from/to client graph on connection page * Reductions graph on connection page * Garbage colletion info on connection page Expose the following AMQP 1.0 per-object Prometheus metrics: * rabbitmq_connection_incoming_bytes_total * rabbitmq_connection_outgoing_bytes_total * rabbitmq_connection_process_reductions_total * rabbitmq_connection_incoming_packets_total * rabbitmq_connection_outgoing_packets_total * rabbitmq_connection_pending_packets * rabbitmq_connection_channels The rabbit_amqp_writer proc: * notifies the rabbit_amqp_reader proc if it sent frames * hibernates eventually if it doesn't send any frames The rabbit_amqp_reader proc: * does not emit stats (update ETS tables) if no frames are received or sent to save resources when there are many idle connections.
1 parent ef2c8df commit 3f9b3b6

File tree

10 files changed

+188
-78
lines changed

10 files changed

+188
-78
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-include_lib("kernel/include/logger.hrl").
1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212
-include_lib("amqp10_common/include/amqp10_types.hrl").
13+
-include("rabbit_amqp_reader.hrl").
1314
-include("rabbit_amqp.hrl").
1415

1516
-export([init/1,
@@ -79,7 +80,8 @@
7980
pending_recv :: boolean(),
8081
buf :: list(),
8182
buf_len :: non_neg_integer(),
82-
tracked_channels :: #{channel_number() => Session :: pid()}
83+
tracked_channels :: #{channel_number() => Session :: pid()},
84+
stats_timer :: rabbit_event:state()
8385
}).
8486

8587
-type state() :: #v1{}.
@@ -90,7 +92,7 @@
9092

9193
unpack_from_0_9_1(
9294
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
93-
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
95+
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
9496
Parent) ->
9597
logger:update_process_metadata(#{connection => ConnectionName}),
9698
#v1{parent = Parent,
@@ -106,6 +108,7 @@ unpack_from_0_9_1(
106108
tracked_channels = maps:new(),
107109
writer = none,
108110
connection_state = received_amqp3100,
111+
stats_timer = StatsTimer,
109112
connection = #v1_connection{
110113
name = ConnectionName,
111114
container_id = none,
@@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
201204
end
202205
end.
203206

207+
handle_other(emit_stats, State) ->
208+
emit_stats(State);
209+
handle_other(ensure_stats_timer, State) ->
210+
ensure_stats_timer(State);
204211
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
205212
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
206213
[Reason]),
@@ -247,8 +254,16 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
247254
end,
248255
gen_server:reply(From, Reply),
249256
State;
250-
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
251-
State;
257+
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
258+
case ?IS_RUNNING(State) of
259+
true ->
260+
Infos = infos(?CONNECTION_EVENT_KEYS, State),
261+
rabbit_event:notify(connection_created, Infos, Ref),
262+
rabbit_event:init_stats_timer(State, #v1.stats_timer);
263+
false ->
264+
%% Ignore, we will emit a connection_created event once we start running.
265+
State
266+
end;
252267
handle_other(terminate_connection, _State) ->
253268
stop;
254269
handle_other({set_credential, Cred}, State) ->
@@ -527,6 +542,7 @@ handle_connection_frame(
527542
proplists:get_value(pid, Infos),
528543
Infos),
529544
ok = rabbit_event:notify(connection_created, Infos),
545+
ok = maybe_emit_stats(State),
530546
ok = rabbit_amqp1_0:register_connection(self()),
531547
Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306
532548
<<"LINK_PAIR_V1_0">>,
@@ -629,25 +645,26 @@ handle_input(handshake,
629645
switch_callback(State, {frame_header, amqp}, 8);
630646
handle_input({frame_header, Mode},
631647
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
632-
State) when DOff >= 2 ->
648+
State0) when DOff >= 2 ->
633649
case {Mode, Type} of
634650
{amqp, 0} -> ok;
635651
{sasl, 1} -> ok;
636-
_ -> throw({bad_1_0_header_type, Header, Mode})
652+
_ -> throw({bad_1_0_header_type, Header, Mode})
637653
end,
638-
MaxFrameSize = State#v1.connection#v1_connection.incoming_max_frame_size,
639-
if Size =:= 8 ->
640-
%% heartbeat
641-
State;
642-
Size > MaxFrameSize ->
643-
handle_exception(
644-
State, Channel, error_frame(
645-
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
646-
"frame size (~b bytes) > maximum frame size (~b bytes)",
647-
[Size, MaxFrameSize]));
648-
true ->
649-
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
650-
end;
654+
MaxFrameSize = State0#v1.connection#v1_connection.incoming_max_frame_size,
655+
State = if Size =:= 8 ->
656+
%% heartbeat
657+
State0;
658+
Size > MaxFrameSize ->
659+
Err = error_frame(
660+
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
661+
"frame size (~b bytes) > maximum frame size (~b bytes)",
662+
[Size, MaxFrameSize]),
663+
handle_exception(State0, Channel, Err);
664+
true ->
665+
switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8)
666+
end,
667+
ensure_stats_timer(State);
651668
handle_input({frame_header, _Mode}, Malformed, _State) ->
652669
throw({bad_1_0_header, Malformed});
653670
handle_input({frame_body, Mode, DOff, Channel},
@@ -1013,13 +1030,18 @@ i(peer_host, #v1{connection = #v1_connection{peer_host = Val}}) ->
10131030
Val;
10141031
i(peer_port, #v1{connection = #v1_connection{peer_port = Val}}) ->
10151032
Val;
1016-
i(SockStat, S) when SockStat =:= recv_oct;
1017-
SockStat =:= recv_cnt;
1018-
SockStat =:= send_oct;
1019-
SockStat =:= send_cnt;
1020-
SockStat =:= send_pend ->
1021-
socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
1022-
fun ([{_, I}]) -> I end, S);
1033+
i(SockStat, #v1{sock = Sock})
1034+
when SockStat =:= recv_oct;
1035+
SockStat =:= recv_cnt;
1036+
SockStat =:= send_oct;
1037+
SockStat =:= send_cnt;
1038+
SockStat =:= send_pend ->
1039+
case rabbit_net:getstat(Sock, [SockStat]) of
1040+
{ok, [{SockStat, Val}]} ->
1041+
Val;
1042+
{error, _} ->
1043+
''
1044+
end;
10231045
i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
10241046
i(SSL, #v1{sock = Sock, proxy_socket = ProxySock})
10251047
when SSL =:= ssl_protocol;
@@ -1045,15 +1067,37 @@ i(channels, #v1{tracked_channels = Channels}) ->
10451067
maps:size(Channels);
10461068
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
10471069
Max;
1070+
i(reductions = Item, _State) ->
1071+
{Item, Reductions} = erlang:process_info(self(), Item),
1072+
Reductions;
1073+
i(garbage_collection, _State) ->
1074+
rabbit_misc:get_gc_info(self());
10481075
i(Item, #v1{}) ->
10491076
throw({bad_argument, Item}).
10501077

1051-
%% From rabbit_reader
1052-
socket_info(Get, Select, #v1{sock = Sock}) ->
1053-
case Get(Sock) of
1054-
{ok, T} -> Select(T);
1055-
{error, _} -> ''
1056-
end.
1078+
maybe_emit_stats(State) ->
1079+
ok = rabbit_event:if_enabled(
1080+
State,
1081+
#v1.stats_timer,
1082+
fun() -> emit_stats(State) end).
1083+
1084+
emit_stats(State) ->
1085+
[{_, Pid},
1086+
{_, RecvOct},
1087+
{_, SendOct},
1088+
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
1089+
Infos = infos(?OTHER_METRICS, State),
1090+
rabbit_core_metrics:connection_stats(Pid, Infos),
1091+
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
1092+
%% NB: Don't call ensure_stats_timer because it becomes expensive
1093+
%% if all idle non-hibernating connections emit stats.
1094+
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
1095+
1096+
ensure_stats_timer(State)
1097+
when ?IS_RUNNING(State) ->
1098+
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
1099+
ensure_stats_timer(State) ->
1100+
State.
10571101

10581102
ignore_maintenance({map, Properties}) ->
10591103
lists:member(
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-define(SIMPLE_METRICS, [pid,
8+
recv_oct,
9+
send_oct,
10+
reductions]).
11+
12+
-define(OTHER_METRICS, [recv_cnt,
13+
send_cnt,
14+
send_pend,
15+
state,
16+
channels,
17+
garbage_collection]).

deps/rabbit/src/rabbit_amqp_writer.erl

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
pending :: iolist(),
3232
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
3333
pending_size :: non_neg_integer(),
34-
monitored_sessions :: #{pid() => true}
34+
monitored_sessions :: #{pid() => true},
35+
stats_timer :: rabbit_event:state()
3536
}).
3637

3738
-define(HIBERNATE_AFTER, 6_000).
@@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
100101
reader = ReaderPid,
101102
pending = [],
102103
pending_size = 0,
103-
monitored_sessions = #{}},
104+
monitored_sessions = #{},
105+
stats_timer = rabbit_event:init_stats_timer()},
104106
process_flag(message_queue_data, off_heap),
105107
{ok, State}.
106108

@@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
123125
State = flush(State1),
124126
{reply, ok, State}.
125127

128+
handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
129+
ReaderPid ! ensure_stats_timer,
130+
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
131+
no_reply(State);
126132
handle_info(timeout, State0) ->
127133
State = flush(State0),
128134
{noreply, State};
@@ -223,18 +229,19 @@ tcp_send(Sock, Data) ->
223229

224230
maybe_flush(State = #state{pending_size = PendingSize}) ->
225231
case PendingSize > ?FLUSH_THRESHOLD of
226-
true -> flush(State);
232+
true -> flush(State);
227233
false -> State
228234
end.
229235

230236
flush(State = #state{pending = []}) ->
231237
State;
232-
flush(State = #state{sock = Sock,
233-
pending = Pending}) ->
238+
flush(State0 = #state{sock = Sock,
239+
pending = Pending}) ->
234240
case rabbit_net:send(Sock, lists:reverse(Pending)) of
235241
ok ->
236-
State#state{pending = [],
237-
pending_size = 0};
242+
State = State0#state{pending = [],
243+
pending_size = 0},
244+
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
238245
{error, Reason} ->
239246
exit({writer, send_failed, Reason})
240247
end.

deps/rabbit/src/rabbit_reader.erl

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
-include_lib("rabbit_common/include/rabbit_framing.hrl").
4444
-include_lib("rabbit_common/include/rabbit.hrl").
45+
-include("rabbit_amqp_reader.hrl").
4546

4647
-export([start_link/2, info/2, force_event_refresh/2,
4748
shutdown/2]).
@@ -116,10 +117,6 @@
116117
connection_blocked_message_sent
117118
}).
118119

119-
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
120-
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
121-
garbage_collection]).
122-
123120
-define(CREATION_EVENT_KEYS,
124121
[pid, name, port, peer_port, host,
125122
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
@@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
15821579
end;
15831580
i(garbage_collection, _State) ->
15841581
rabbit_misc:get_gc_info(self());
1585-
i(reductions, _State) ->
1586-
{reductions, Reductions} = erlang:process_info(self(), reductions),
1582+
i(reductions = Item, _State) ->
1583+
{Item, Reductions} = erlang:process_info(self(), Item),
15871584
Reductions;
15881585
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
15891586

@@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->
16231620

16241621
emit_stats(State) ->
16251622
[{_, Pid},
1626-
{_, Recv_oct},
1627-
{_, Send_oct},
1623+
{_, RecvOct},
1624+
{_, SendOct},
16281625
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
16291626
Infos = infos(?OTHER_METRICS, State),
16301627
rabbit_core_metrics:connection_stats(Pid, Infos),
1631-
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
1628+
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
16321629
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
16331630
ensure_stats_timer(State1).
16341631

@@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16431640
pending_recv = PendingRecv,
16441641
helper_sup = {_HelperSup091, HelperSup10},
16451642
proxy_socket = ProxySocket,
1643+
stats_timer = StatsTimer,
16461644
connection = #connection{
16471645
name = Name,
16481646
host = Host,
@@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16511649
peer_port = PeerPort,
16521650
connected_at = ConnectedAt}}) ->
16531651
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
1654-
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
1652+
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.
16551653

16561654
respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
16571655
log_hard_error(State, Channel, LogErr),

deps/rabbit_common/src/rabbit_core_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ connection_stats(Pid, Infos) ->
141141
ets:insert(connection_metrics, {Pid, Infos}),
142142
ok.
143143

144-
connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
144+
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
145145
%% Includes delete marker
146-
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
146+
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
147147
ok.
148148

149149
channel_created(Pid, Infos) ->

deps/rabbit_common/src/rabbit_event.erl

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
-include("rabbit.hrl").
1111

1212
-export([start_link/0]).
13-
-export([init_stats_timer/2, init_disabled_stats_timer/2,
13+
-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2,
1414
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
1515
-export([stats_level/2, if_enabled/3]).
1616
-export([notify/2, notify/3, notify_if/3]).
@@ -89,23 +89,34 @@ start_link() ->
8989
%% Nowadays, instead of sending a message to rabbit_event via notify(stats),
9090
%% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics.
9191

92-
init_stats_timer(C, P) ->
92+
-spec init_stats_timer() -> state().
93+
init_stats_timer() ->
9394
%% If the rabbit app is not loaded - use default none:5000
9495
StatsLevel = application:get_env(rabbit, collect_statistics, none),
95-
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
96-
setelement(P, C, #state{level = StatsLevel, interval = Interval,
97-
timer = undefined}).
96+
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
97+
#state{level = StatsLevel,
98+
interval = Interval,
99+
timer = undefined}.
100+
101+
init_stats_timer(C, P) ->
102+
State = init_stats_timer(),
103+
setelement(P, C, State).
98104

99105
init_disabled_stats_timer(C, P) ->
100-
setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
106+
State = #state{level = none,
107+
interval = 0,
108+
timer = undefined},
109+
setelement(P, C, State).
101110

102111
ensure_stats_timer(C, P, Msg) ->
103112
case element(P, C) of
104-
#state{level = Level, interval = Interval, timer = undefined} = State
113+
#state{level = Level,
114+
interval = Interval,
115+
timer = undefined} = State
105116
when Level =/= none ->
106117
TRef = erlang:send_after(Interval, self(), Msg),
107118
setelement(P, C, State#state{timer = TRef});
108-
#state{} ->
119+
_State ->
109120
C
110121
end.
111122

@@ -156,5 +167,5 @@ event_cons(Type, Props, Ref) ->
156167
#event{type = Type,
157168
props = Props,
158169
reference = Ref,
159-
timestamp = os:system_time(milli_seconds)}.
170+
timestamp = os:system_time(millisecond)}.
160171

0 commit comments

Comments
 (0)