Skip to content

Commit c867f2f

Browse files
committed
WIP connection metrics
TODO: maybe_emit_stats/1 as done in rabbit_reader
1 parent 1969ae4 commit c867f2f

File tree

6 files changed

+87
-24
lines changed

6 files changed

+87
-24
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-include_lib("kernel/include/logger.hrl").
1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212
-include_lib("amqp10_common/include/amqp10_types.hrl").
13+
-include("rabbit_reader.hrl").
1314
-include("rabbit_amqp.hrl").
1415

1516
-export([init/1,
@@ -79,7 +80,8 @@
7980
pending_recv :: boolean(),
8081
buf :: list(),
8182
buf_len :: non_neg_integer(),
82-
tracked_channels :: #{channel_number() => Session :: pid()}
83+
tracked_channels :: #{channel_number() => Session :: pid()},
84+
stats_timer :: rabbit_event:state()
8385
}).
8486

8587
-type state() :: #v1{}.
@@ -90,7 +92,7 @@
9092

9193
unpack_from_0_9_1(
9294
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
93-
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
95+
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
9496
Parent) ->
9597
logger:update_process_metadata(#{connection => ConnectionName}),
9698
#v1{parent = Parent,
@@ -106,6 +108,7 @@ unpack_from_0_9_1(
106108
tracked_channels = maps:new(),
107109
writer = none,
108110
connection_state = received_amqp3100,
111+
stats_timer = StatsTimer,
109112
connection = #v1_connection{
110113
name = ConnectionName,
111114
container_id = none,
@@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
201204
end
202205
end.
203206

207+
handle_other(emit_stats, State) ->
208+
emit_stats(State);
209+
handle_other(ensure_stats_timer, State) ->
210+
ensure_stats_timer(State);
204211
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
205212
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
206213
[Reason]),
@@ -247,8 +254,17 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
247254
end,
248255
gen_server:reply(From, Reply),
249256
State;
250-
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
251-
State;
257+
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
258+
case ?IS_RUNNING(State) of
259+
true ->
260+
%%TODO test case
261+
Infos = infos(?CONNECTION_EVENT_KEYS, State),
262+
rabbit_event:notify(connection_created, Infos, Ref),
263+
rabbit_event:init_stats_timer(State, #v1.stats_timer);
264+
false ->
265+
%% Ignore, we will emit a connection_created event once we start running.
266+
State
267+
end;
252268
handle_other(terminate_connection, _State) ->
253269
stop;
254270
handle_other({set_credential, Cred}, State) ->
@@ -527,6 +543,7 @@ handle_connection_frame(
527543
proplists:get_value(pid, Infos),
528544
Infos),
529545
ok = rabbit_event:notify(connection_created, Infos),
546+
ok = maybe_emit_stats(State),
530547
ok = rabbit_amqp1_0:register_connection(self()),
531548
Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306
532549
<<"LINK_PAIR_V1_0">>,
@@ -646,7 +663,8 @@ handle_input({frame_header, Mode},
646663
"frame size (~b bytes) > maximum frame size (~b bytes)",
647664
[Size, MaxFrameSize]));
648665
true ->
649-
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
666+
State1 = switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8),
667+
ensure_stats_timer(State1)
650668
end;
651669
handle_input({frame_header, _Mode}, Malformed, _State) ->
652670
throw({bad_1_0_header, Malformed});
@@ -1045,6 +1063,11 @@ i(channels, #v1{tracked_channels = Channels}) ->
10451063
maps:size(Channels);
10461064
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
10471065
Max;
1066+
i(reductions = Item, _State) ->
1067+
{Item, Reductions} = erlang:process_info(self(), Item),
1068+
Reductions;
1069+
i(garbage_collection, _State) ->
1070+
rabbit_misc:get_gc_info(self());
10481071
i(Item, #v1{}) ->
10491072
throw({bad_argument, Item}).
10501073

@@ -1055,6 +1078,30 @@ socket_info(Get, Select, #v1{sock = Sock}) ->
10551078
{error, _} -> ''
10561079
end.
10571080

1081+
maybe_emit_stats(State) ->
1082+
ok = rabbit_event:if_enabled(
1083+
State,
1084+
#v1.stats_timer,
1085+
fun() -> emit_stats(State) end).
1086+
1087+
emit_stats(State) ->
1088+
[{_, Pid},
1089+
{_, RecvOct},
1090+
{_, SendOct},
1091+
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
1092+
Infos = infos(?OTHER_METRICS, State),
1093+
rabbit_core_metrics:connection_stats(Pid, Infos),
1094+
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
1095+
%% NB: Don't call ensure_stats_timer because it becomes expensive
1096+
%% if all idle non-hibernating connections emit stats.
1097+
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
1098+
1099+
ensure_stats_timer(State)
1100+
when ?IS_RUNNING(State) ->
1101+
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
1102+
ensure_stats_timer(State) ->
1103+
State.
1104+
10581105
ignore_maintenance({map, Properties}) ->
10591106
lists:member(
10601107
{{symbol, <<"ignore-maintenance">>}, true},

deps/rabbit/src/rabbit_amqp_writer.erl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131
pending :: iolist(),
3232
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
3333
pending_size :: non_neg_integer(),
34-
monitored_sessions :: #{pid() => true}
34+
monitored_sessions :: #{pid() => true},
35+
stats_timer :: rabbit_event:state()
3536
}).
3637

3738
-define(HIBERNATE_AFTER, 6_000).
@@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
100101
reader = ReaderPid,
101102
pending = [],
102103
pending_size = 0,
103-
monitored_sessions = #{}},
104+
monitored_sessions = #{},
105+
stats_timer = rabbit_event:init_stats_timer()},
104106
process_flag(message_queue_data, off_heap),
105107
{ok, State}.
106108

@@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
123125
State = flush(State1),
124126
{reply, ok, State}.
125127

128+
handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
129+
ReaderPid ! ensure_stats_timer,
130+
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
131+
no_reply(State);
126132
handle_info(timeout, State0) ->
127133
State = flush(State0),
128134
{noreply, State};
@@ -229,12 +235,13 @@ maybe_flush(State = #state{pending_size = PendingSize}) ->
229235

230236
flush(State = #state{pending = []}) ->
231237
State;
232-
flush(State = #state{sock = Sock,
233-
pending = Pending}) ->
238+
flush(State0 = #state{sock = Sock,
239+
pending = Pending}) ->
234240
case rabbit_net:send(Sock, lists:reverse(Pending)) of
235241
ok ->
236-
State#state{pending = [],
237-
pending_size = 0};
242+
State = State0#state{pending = [],
243+
pending_size = 0},
244+
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
238245
{error, Reason} ->
239246
exit({writer, send_failed, Reason})
240247
end.

deps/rabbit/src/rabbit_reader.erl

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
-include_lib("rabbit_common/include/rabbit_framing.hrl").
4444
-include_lib("rabbit_common/include/rabbit.hrl").
45+
-include("rabbit_reader.hrl").
4546

4647
-export([start_link/2, info/2, force_event_refresh/2,
4748
shutdown/2]).
@@ -116,10 +117,6 @@
116117
connection_blocked_message_sent
117118
}).
118119

119-
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
120-
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
121-
garbage_collection]).
122-
123120
-define(CREATION_EVENT_KEYS,
124121
[pid, name, port, peer_port, host,
125122
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
@@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
15821579
end;
15831580
i(garbage_collection, _State) ->
15841581
rabbit_misc:get_gc_info(self());
1585-
i(reductions, _State) ->
1586-
{reductions, Reductions} = erlang:process_info(self(), reductions),
1582+
i(reductions = Item, _State) ->
1583+
{Item, Reductions} = erlang:process_info(self(), Item),
15871584
Reductions;
15881585
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
15891586

@@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->
16231620

16241621
emit_stats(State) ->
16251622
[{_, Pid},
1626-
{_, Recv_oct},
1627-
{_, Send_oct},
1623+
{_, RecvOct},
1624+
{_, SendOct},
16281625
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
16291626
Infos = infos(?OTHER_METRICS, State),
16301627
rabbit_core_metrics:connection_stats(Pid, Infos),
1631-
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
1628+
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
16321629
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
16331630
ensure_stats_timer(State1).
16341631

@@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16431640
pending_recv = PendingRecv,
16441641
helper_sup = {_HelperSup091, HelperSup10},
16451642
proxy_socket = ProxySocket,
1643+
stats_timer = StatsTimer,
16461644
connection = #connection{
16471645
name = Name,
16481646
host = Host,
@@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16511649
peer_port = PeerPort,
16521650
connected_at = ConnectedAt}}) ->
16531651
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
1654-
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
1652+
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.
16551653

16561654
respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
16571655
log_hard_error(State, Channel, LogErr),

deps/rabbit/src/rabbit_reader.hrl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
%% This Source Code Form is subject to the terms of the Mozilla Public
3+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
4+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
5+
%%
6+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
7+
8+
-define(SIMPLE_METRICS,
9+
[pid, recv_oct, send_oct, reductions]).
10+
-define(OTHER_METRICS,
11+
[recv_cnt, send_cnt, send_pend, state, channels, garbage_collection]).

deps/rabbit_common/src/rabbit_core_metrics.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ connection_stats(Pid, Infos) ->
168168
ets:insert(connection_metrics, {Pid, Infos}),
169169
ok.
170170

171-
connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
171+
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
172172
%% Includes delete marker
173-
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
173+
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
174174
ok.
175175

176176
-spec session_begun(rabbit_types:infos()) -> ok.

deps/rabbit_common/src/rabbit_event.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ ensure_stats_timer(C, P, Msg) ->
110110
when Level =/= none ->
111111
TRef = erlang:send_after(Interval, self(), Msg),
112112
setelement(P, C, State#state{timer = TRef});
113-
#state{} ->
113+
_State ->
114114
C
115115
end.
116116

0 commit comments

Comments
 (0)