Skip to content

Commit 6034f3c

Browse files
authored
Merge pull request #12638 from rabbitmq/amqp-connection-metrics
Expose AMQP connection metrics
2 parents fe587ae + 3db4a97 commit 6034f3c

File tree

10 files changed

+187
-77
lines changed

10 files changed

+187
-77
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 76 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-include_lib("kernel/include/logger.hrl").
1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212
-include_lib("amqp10_common/include/amqp10_types.hrl").
13+
-include("rabbit_amqp_reader.hrl").
1314
-include("rabbit_amqp.hrl").
1415

1516
-export([init/1,
@@ -79,7 +80,8 @@
7980
pending_recv :: boolean(),
8081
buf :: list(),
8182
buf_len :: non_neg_integer(),
82-
tracked_channels :: #{channel_number() => Session :: pid()}
83+
tracked_channels :: #{channel_number() => Session :: pid()},
84+
stats_timer :: rabbit_event:state()
8385
}).
8486

8587
-type state() :: #v1{}.
@@ -90,7 +92,7 @@
9092

9193
unpack_from_0_9_1(
9294
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
93-
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
95+
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
9496
Parent) ->
9597
logger:update_process_metadata(#{connection => ConnectionName}),
9698
#v1{parent = Parent,
@@ -106,6 +108,7 @@ unpack_from_0_9_1(
106108
tracked_channels = maps:new(),
107109
writer = none,
108110
connection_state = received_amqp3100,
111+
stats_timer = StatsTimer,
109112
connection = #v1_connection{
110113
name = ConnectionName,
111114
container_id = none,
@@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
201204
end
202205
end.
203206

207+
handle_other(emit_stats, State) ->
208+
emit_stats(State);
209+
handle_other(ensure_stats_timer, State) ->
210+
ensure_stats_timer(State);
204211
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
205212
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
206213
[Reason]),
@@ -247,8 +254,16 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
247254
end,
248255
gen_server:reply(From, Reply),
249256
State;
250-
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
251-
State;
257+
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
258+
case ?IS_RUNNING(State) of
259+
true ->
260+
Infos = infos(?CONNECTION_EVENT_KEYS, State),
261+
rabbit_event:notify(connection_created, Infos, Ref),
262+
rabbit_event:init_stats_timer(State, #v1.stats_timer);
263+
false ->
264+
%% Ignore, we will emit a connection_created event once we start running.
265+
State
266+
end;
252267
handle_other(terminate_connection, _State) ->
253268
stop;
254269
handle_other({set_credential, Cred}, State) ->
@@ -527,6 +542,7 @@ handle_connection_frame(
527542
proplists:get_value(pid, Infos),
528543
Infos),
529544
ok = rabbit_event:notify(connection_created, Infos),
545+
ok = maybe_emit_stats(State),
530546
ok = rabbit_amqp1_0:register_connection(self()),
531547
Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306
532548
<<"LINK_PAIR_V1_0">>,
@@ -629,25 +645,26 @@ handle_input(handshake,
629645
switch_callback(State, {frame_header, amqp}, 8);
630646
handle_input({frame_header, Mode},
631647
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
632-
State) when DOff >= 2 ->
648+
State0) when DOff >= 2 ->
633649
case {Mode, Type} of
634650
{amqp, 0} -> ok;
635651
{sasl, 1} -> ok;
636-
_ -> throw({bad_1_0_header_type, Header, Mode})
652+
_ -> throw({bad_1_0_header_type, Header, Mode})
637653
end,
638-
MaxFrameSize = State#v1.connection#v1_connection.incoming_max_frame_size,
639-
if Size =:= 8 ->
640-
%% heartbeat
641-
State;
642-
Size > MaxFrameSize ->
643-
handle_exception(
644-
State, Channel, error_frame(
645-
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
646-
"frame size (~b bytes) > maximum frame size (~b bytes)",
647-
[Size, MaxFrameSize]));
648-
true ->
649-
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
650-
end;
654+
MaxFrameSize = State0#v1.connection#v1_connection.incoming_max_frame_size,
655+
State = if Size =:= 8 ->
656+
%% heartbeat
657+
State0;
658+
Size > MaxFrameSize ->
659+
Err = error_frame(
660+
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
661+
"frame size (~b bytes) > maximum frame size (~b bytes)",
662+
[Size, MaxFrameSize]),
663+
handle_exception(State0, Channel, Err);
664+
true ->
665+
switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8)
666+
end,
667+
ensure_stats_timer(State);
651668
handle_input({frame_header, _Mode}, Malformed, _State) ->
652669
throw({bad_1_0_header, Malformed});
653670
handle_input({frame_body, Mode, DOff, Channel},
@@ -1013,13 +1030,18 @@ i(peer_host, #v1{connection = #v1_connection{peer_host = Val}}) ->
10131030
Val;
10141031
i(peer_port, #v1{connection = #v1_connection{peer_port = Val}}) ->
10151032
Val;
1016-
i(SockStat, S) when SockStat =:= recv_oct;
1017-
SockStat =:= recv_cnt;
1018-
SockStat =:= send_oct;
1019-
SockStat =:= send_cnt;
1020-
SockStat =:= send_pend ->
1021-
socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
1022-
fun ([{_, I}]) -> I end, S);
1033+
i(SockStat, #v1{sock = Sock})
1034+
when SockStat =:= recv_oct;
1035+
SockStat =:= recv_cnt;
1036+
SockStat =:= send_oct;
1037+
SockStat =:= send_cnt;
1038+
SockStat =:= send_pend ->
1039+
case rabbit_net:getstat(Sock, [SockStat]) of
1040+
{ok, [{SockStat, Val}]} ->
1041+
Val;
1042+
{error, _} ->
1043+
''
1044+
end;
10231045
i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
10241046
i(SSL, #v1{sock = Sock, proxy_socket = ProxySock})
10251047
when SSL =:= ssl_protocol;
@@ -1045,15 +1067,37 @@ i(channels, #v1{tracked_channels = Channels}) ->
10451067
maps:size(Channels);
10461068
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
10471069
Max;
1070+
i(reductions = Item, _State) ->
1071+
{Item, Reductions} = erlang:process_info(self(), Item),
1072+
Reductions;
1073+
i(garbage_collection, _State) ->
1074+
rabbit_misc:get_gc_info(self());
10481075
i(Item, #v1{}) ->
10491076
throw({bad_argument, Item}).
10501077

1051-
%% From rabbit_reader
1052-
socket_info(Get, Select, #v1{sock = Sock}) ->
1053-
case Get(Sock) of
1054-
{ok, T} -> Select(T);
1055-
{error, _} -> ''
1056-
end.
1078+
maybe_emit_stats(State) ->
1079+
ok = rabbit_event:if_enabled(
1080+
State,
1081+
#v1.stats_timer,
1082+
fun() -> emit_stats(State) end).
1083+
1084+
emit_stats(State) ->
1085+
[{_, Pid},
1086+
{_, RecvOct},
1087+
{_, SendOct},
1088+
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
1089+
Infos = infos(?OTHER_METRICS, State),
1090+
rabbit_core_metrics:connection_stats(Pid, Infos),
1091+
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
1092+
%% NB: Don't call ensure_stats_timer because it becomes expensive
1093+
%% if all idle non-hibernating connections emit stats.
1094+
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
1095+
1096+
ensure_stats_timer(State)
1097+
when ?IS_RUNNING(State) ->
1098+
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
1099+
ensure_stats_timer(State) ->
1100+
State.
10571101

10581102
ignore_maintenance({map, Properties}) ->
10591103
lists:member(
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-define(SIMPLE_METRICS, [pid,
8+
recv_oct,
9+
send_oct,
10+
reductions]).
11+
12+
-define(OTHER_METRICS, [recv_cnt,
13+
send_cnt,
14+
send_pend,
15+
state,
16+
channels,
17+
garbage_collection]).

deps/rabbit/src/rabbit_amqp_writer.erl

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
pending :: iolist(),
3232
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
3333
pending_size :: non_neg_integer(),
34-
monitored_sessions :: #{pid() => true}
34+
monitored_sessions :: #{pid() => true},
35+
stats_timer :: rabbit_event:state()
3536
}).
3637

3738
-define(HIBERNATE_AFTER, 6_000).
@@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
100101
reader = ReaderPid,
101102
pending = [],
102103
pending_size = 0,
103-
monitored_sessions = #{}},
104+
monitored_sessions = #{},
105+
stats_timer = rabbit_event:init_stats_timer()},
104106
process_flag(message_queue_data, off_heap),
105107
{ok, State}.
106108

@@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
123125
State = flush(State1),
124126
{reply, ok, State}.
125127

128+
handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
129+
ReaderPid ! ensure_stats_timer,
130+
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
131+
no_reply(State);
126132
handle_info(timeout, State0) ->
127133
State = flush(State0),
128134
{noreply, State};
@@ -223,18 +229,19 @@ tcp_send(Sock, Data) ->
223229

224230
maybe_flush(State = #state{pending_size = PendingSize}) ->
225231
case PendingSize > ?FLUSH_THRESHOLD of
226-
true -> flush(State);
232+
true -> flush(State);
227233
false -> State
228234
end.
229235

230236
flush(State = #state{pending = []}) ->
231237
State;
232-
flush(State = #state{sock = Sock,
233-
pending = Pending}) ->
238+
flush(State0 = #state{sock = Sock,
239+
pending = Pending}) ->
234240
case rabbit_net:send(Sock, lists:reverse(Pending)) of
235241
ok ->
236-
State#state{pending = [],
237-
pending_size = 0};
242+
State = State0#state{pending = [],
243+
pending_size = 0},
244+
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
238245
{error, Reason} ->
239246
exit({writer, send_failed, Reason})
240247
end.

deps/rabbit/src/rabbit_reader.erl

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
-include_lib("rabbit_common/include/rabbit_framing.hrl").
4444
-include_lib("rabbit_common/include/rabbit.hrl").
45+
-include("rabbit_amqp_reader.hrl").
4546

4647
-export([start_link/2, info/2, force_event_refresh/2,
4748
shutdown/2]).
@@ -116,10 +117,6 @@
116117
connection_blocked_message_sent
117118
}).
118119

119-
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
120-
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
121-
garbage_collection]).
122-
123120
-define(CREATION_EVENT_KEYS,
124121
[pid, name, port, peer_port, host,
125122
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
@@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
15821579
end;
15831580
i(garbage_collection, _State) ->
15841581
rabbit_misc:get_gc_info(self());
1585-
i(reductions, _State) ->
1586-
{reductions, Reductions} = erlang:process_info(self(), reductions),
1582+
i(reductions = Item, _State) ->
1583+
{Item, Reductions} = erlang:process_info(self(), Item),
15871584
Reductions;
15881585
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
15891586

@@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->
16231620

16241621
emit_stats(State) ->
16251622
[{_, Pid},
1626-
{_, Recv_oct},
1627-
{_, Send_oct},
1623+
{_, RecvOct},
1624+
{_, SendOct},
16281625
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
16291626
Infos = infos(?OTHER_METRICS, State),
16301627
rabbit_core_metrics:connection_stats(Pid, Infos),
1631-
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
1628+
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
16321629
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
16331630
ensure_stats_timer(State1).
16341631

@@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16431640
pending_recv = PendingRecv,
16441641
helper_sup = {_HelperSup091, HelperSup10},
16451642
proxy_socket = ProxySocket,
1643+
stats_timer = StatsTimer,
16461644
connection = #connection{
16471645
name = Name,
16481646
host = Host,
@@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16511649
peer_port = PeerPort,
16521650
connected_at = ConnectedAt}}) ->
16531651
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
1654-
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
1652+
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.
16551653

16561654
respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
16571655
log_hard_error(State, Channel, LogErr),

deps/rabbit_common/src/rabbit_core_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ connection_stats(Pid, Infos) ->
141141
ets:insert(connection_metrics, {Pid, Infos}),
142142
ok.
143143

144-
connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
144+
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
145145
%% Includes delete marker
146-
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
146+
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
147147
ok.
148148

149149
channel_created(Pid, Infos) ->

deps/rabbit_common/src/rabbit_event.erl

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
-include("rabbit.hrl").
1111

1212
-export([start_link/0]).
13-
-export([init_stats_timer/2, init_disabled_stats_timer/2,
13+
-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2,
1414
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
1515
-export([stats_level/2, if_enabled/3]).
1616
-export([notify/2, notify/3, notify_if/3]).
@@ -89,23 +89,34 @@ start_link() ->
8989
%% Nowadays, instead of sending a message to rabbit_event via notify(stats),
9090
%% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics.
9191

92-
init_stats_timer(C, P) ->
92+
-spec init_stats_timer() -> state().
93+
init_stats_timer() ->
9394
%% If the rabbit app is not loaded - use default none:5000
9495
StatsLevel = application:get_env(rabbit, collect_statistics, none),
95-
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
96-
setelement(P, C, #state{level = StatsLevel, interval = Interval,
97-
timer = undefined}).
96+
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
97+
#state{level = StatsLevel,
98+
interval = Interval,
99+
timer = undefined}.
100+
101+
init_stats_timer(C, P) ->
102+
State = init_stats_timer(),
103+
setelement(P, C, State).
98104

99105
init_disabled_stats_timer(C, P) ->
100-
setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
106+
State = #state{level = none,
107+
interval = 0,
108+
timer = undefined},
109+
setelement(P, C, State).
101110

102111
ensure_stats_timer(C, P, Msg) ->
103112
case element(P, C) of
104-
#state{level = Level, interval = Interval, timer = undefined} = State
113+
#state{level = Level,
114+
interval = Interval,
115+
timer = undefined} = State
105116
when Level =/= none ->
106117
TRef = erlang:send_after(Interval, self(), Msg),
107118
setelement(P, C, State#state{timer = TRef});
108-
#state{} ->
119+
_State ->
109120
C
110121
end.
111122

@@ -156,5 +167,5 @@ event_cons(Type, Props, Ref) ->
156167
#event{type = Type,
157168
props = Props,
158169
reference = Ref,
159-
timestamp = os:system_time(milli_seconds)}.
170+
timestamp = os:system_time(millisecond)}.
160171

0 commit comments

Comments
 (0)