Skip to content

Commit 72b45f5

Browse files
Merge pull request #14798 from rabbitmq/mergify/bp/v4.2.x/pr-14795
Handle multiple alarms consistently in protocol readers (backport #14795)
2 parents f1fff8d + af6d29d commit 72b45f5

File tree

4 files changed

+55
-30
lines changed

4 files changed

+55
-30
lines changed

deps/rabbit/src/rabbit_reader.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
335335
throttle = #throttle{
336336
last_blocked_at = never,
337337
should_block = false,
338-
blocked_by = sets:new(),
338+
blocked_by = sets:new([{version, 2}]),
339339
connection_blocked_message_sent = false
340340
},
341341
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock)},
@@ -1310,7 +1310,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
13101310
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
13111311

13121312
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
1313-
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms]),
1313+
BlockedBy = sets:from_list([{resource, Alarm} || Alarm <- Alarms], [{version, 2}]),
13141314
Throttle1 = Throttle#throttle{blocked_by = BlockedBy},
13151315

13161316
{ok, ChannelSupSupPid} =

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3737
rabbit_mqtt_processor:state(),
3838
connection_state :: running | blocked,
39-
conserve :: boolean(),
39+
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
4040
stats_timer :: rabbit_event:state(),
4141
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
4242
conn_name :: binary()
@@ -53,8 +53,8 @@ start_link(Ref, _Transport, []) ->
5353
-spec conserve_resources(pid(),
5454
rabbit_alarm:resource_alarm_source(),
5555
rabbit_alarm:resource_alert()) -> ok.
56-
conserve_resources(Pid, _, {_, Conserve, _}) ->
57-
Pid ! {conserve_resources, Conserve},
56+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
57+
Pid ! {conserve_resources, Source, Conserve},
5858
ok.
5959

6060
-spec info(pid(), rabbit_types:info_keys()) ->
@@ -78,15 +78,15 @@ init(Ref) ->
7878
{ok, ConnStr} ->
7979
ConnName = rabbit_data_coercion:to_binary(ConnStr),
8080
?LOG_DEBUG("MQTT accepting TCP connection ~tp (~ts)", [self(), ConnName]),
81-
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
81+
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
8282
LoginTimeout = application:get_env(?APP_NAME, login_timeout, 10_000),
8383
erlang:send_after(LoginTimeout, self(), login_timeout),
8484
State0 = #state{socket = RealSocket,
8585
proxy_socket = rabbit_net:maybe_get_proxy_socket(Sock),
8686
conn_name = ConnName,
8787
await_recv = false,
8888
connection_state = running,
89-
conserve = false,
89+
blocked_by = sets:from_list(Alarms, [{version, 2}]),
9090
parse_state = rabbit_mqtt_packet:init_state(),
9191
stats_timer = rabbit_event:init_stats_timer()},
9292
State = control_throttle(State0),
@@ -185,9 +185,16 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
185185
when Tag =:= tcp_error; Tag =:= ssl_error ->
186186
network_error(Reason, State);
187187

188-
handle_info({conserve_resources, Conserve}, State) ->
188+
handle_info({conserve_resources, Source, Conserve},
189+
#state{blocked_by = BlockedBy0} = State) ->
190+
BlockedBy = case Conserve of
191+
true ->
192+
sets:add_element(Source, BlockedBy0);
193+
false ->
194+
sets:del_element(Source, BlockedBy0)
195+
end,
189196
maybe_process_deferred_recv(
190-
control_throttle(State #state{ conserve = Conserve }));
197+
control_throttle(State #state{ blocked_by = BlockedBy }));
191198

192199
handle_info({bump_credit, Msg}, State) ->
193200
credit_flow:handle_bump_msg(Msg),
@@ -417,10 +424,11 @@ run_socket(State = #state{ socket = Sock }) ->
417424
State#state{ await_recv = true }.
418425

419426
control_throttle(State = #state{connection_state = ConnState,
420-
conserve = Conserve,
427+
blocked_by = BlockedBy,
421428
proc_state = PState,
422429
keepalive = KState
423430
}) ->
431+
Conserve = not sets:is_empty(BlockedBy),
424432
Throttle = case PState of
425433
connect_packet_unprocessed -> Conserve;
426434
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)
@@ -537,7 +545,7 @@ format_state(#state{socket = Socket,
537545
parse_state = _,
538546
proc_state = PState,
539547
connection_state = ConnectionState,
540-
conserve = Conserve,
548+
blocked_by = BlockedBy,
541549
stats_timer = StatsTimer,
542550
keepalive = Keepalive,
543551
conn_name = ConnName
@@ -552,7 +560,7 @@ format_state(#state{socket = Socket,
552560
rabbit_mqtt_processor:format_status(PState)
553561
end,
554562
connection_state => ConnectionState,
555-
conserve => Conserve,
563+
blocked_by => lists:sort(sets:to_list(BlockedBy)),
556564
stats_timer => StatsTimer,
557565
keepalive => Keepalive,
558566
conn_name => ConnName}.

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
parse_state,
3434
processor_state,
3535
state,
36-
conserve_resources,
36+
blocked_by :: sets:set(rabbit_alarm:resource_alarm_source()),
3737
recv_outstanding,
3838
max_frame_size,
3939
current_frame_size,
@@ -82,7 +82,7 @@ init([SupHelperPid, Ref, Configuration]) ->
8282
[self(), ConnName]),
8383

8484
ParseState = rabbit_stomp_frame:initial_state(),
85-
_ = register_resource_alarm(),
85+
Alarms = register_resource_alarm(),
8686

8787
LoginTimeout = application:get_env(rabbitmq_stomp, login_timeout, 10_000),
8888
MaxFrameSize = application:get_env(rabbitmq_stomp, max_frame_size, ?DEFAULT_MAX_FRAME_SIZE),
@@ -100,7 +100,7 @@ init([SupHelperPid, Ref, Configuration]) ->
100100
max_frame_size = MaxFrameSize,
101101
current_frame_size = 0,
102102
state = running,
103-
conserve_resources = false,
103+
blocked_by = sets:from_list(Alarms, [{version, 2}]),
104104
recv_outstanding = false})), #reader_state.stats_timer),
105105
{backoff, 1000, 1000, 10000});
106106
{error, enotconn} ->
@@ -146,8 +146,15 @@ handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
146146
{stop, {inet_error, Reason}, State};
147147
handle_info(emit_stats, State) ->
148148
{noreply, emit_stats(State), hibernate};
149-
handle_info({conserve_resources, Conserve}, State) ->
150-
NewState = State#reader_state{conserve_resources = Conserve},
149+
handle_info({conserve_resources, Source, Conserve},
150+
#reader_state{blocked_by = BlockedBy0} = State) ->
151+
BlockedBy = case Conserve of
152+
true ->
153+
sets:add_element(Source, BlockedBy0);
154+
false ->
155+
sets:del_element(Source, BlockedBy0)
156+
end,
157+
NewState = State#reader_state{blocked_by = BlockedBy},
151158
{noreply, run_socket(control_throttle(NewState)), hibernate};
152159
handle_info({bump_credit, Msg}, State) ->
153160
credit_flow:handle_bump_msg(Msg),
@@ -288,18 +295,19 @@ process_received_bytes(Bytes,
288295
-spec conserve_resources(pid(),
289296
rabbit_alarm:resource_alarm_source(),
290297
rabbit_alarm:resource_alert()) -> ok.
291-
conserve_resources(Pid, _Source, {_, Conserve, _}) ->
292-
Pid ! {conserve_resources, Conserve},
298+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
299+
Pid ! {conserve_resources, Source, Conserve},
293300
ok.
294301

295302
register_resource_alarm() ->
296303
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}).
297304

298305

299-
control_throttle(State = #reader_state{state = CS,
300-
conserve_resources = Mem,
306+
control_throttle(State = #reader_state{state = CS,
307+
blocked_by = BlockedBy,
301308
heartbeat = Heartbeat}) ->
302-
case {CS, Mem orelse credit_flow:blocked()} of
309+
Conserve = not sets:is_empty(BlockedBy),
310+
case {CS, Conserve orelse credit_flow:blocked()} of
303311
{running, true} -> State#reader_state{state = blocking};
304312
{blocking, false} -> rabbit_heartbeat:resume_monitor(Heartbeat),
305313
State#reader_state{state = running};

deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
proc_state = connect_packet_unprocessed :: connect_packet_unprocessed |
3939
rabbit_mqtt_processor:state(),
4040
connection_state = running :: running | blocked,
41-
conserve = false :: boolean(),
41+
blocked_by = sets:new([{version, 2}]) :: sets:set(rabbit_alarm:resource_alarm_source()),
4242
stats_timer :: option(rabbit_event:state()),
4343
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
4444
conn_name :: option(binary())
@@ -141,8 +141,9 @@ websocket_init(State0 = #state{socket = Sock}) ->
141141
{ok, ConnStr} ->
142142
ConnName = rabbit_data_coercion:to_binary(ConnStr),
143143
?LOG_INFO("Accepting Web MQTT connection ~s", [ConnName]),
144-
_ = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
145-
State = State0#state{conn_name = ConnName},
144+
Alarms = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
145+
State = State0#state{conn_name = ConnName,
146+
blocked_by = sets:from_list(Alarms, [{version, 2}])},
146147
process_flag(trap_exit, true),
147148
{[], State, hibernate};
148149
{error, Reason} ->
@@ -152,8 +153,8 @@ websocket_init(State0 = #state{socket = Sock}) ->
152153
-spec conserve_resources(pid(),
153154
rabbit_alarm:resource_alarm_source(),
154155
rabbit_alarm:resource_alert()) -> ok.
155-
conserve_resources(Pid, _, {_, Conserve, _}) ->
156-
Pid ! {conserve_resources, Conserve},
156+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
157+
Pid ! {conserve_resources, Source, Conserve},
157158
ok.
158159

159160
-spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) ->
@@ -176,8 +177,15 @@ websocket_handle(Frame, State) ->
176177
-spec websocket_info(any(), State) ->
177178
{cowboy_websocket:commands(), State} |
178179
{cowboy_websocket:commands(), State, hibernate}.
179-
websocket_info({conserve_resources, Conserve}, State) ->
180-
handle_credits(State#state{conserve = Conserve});
180+
websocket_info({conserve_resources, Source, Conserve},
181+
#state{blocked_by = BlockedBy0} = State) ->
182+
BlockedBy = case Conserve of
183+
true ->
184+
sets:add_element(Source, BlockedBy0);
185+
false ->
186+
sets:del_element(Source, BlockedBy0)
187+
end,
188+
handle_credits(State#state{blocked_by = BlockedBy});
181189
websocket_info({bump_credit, Msg}, State) ->
182190
credit_flow:handle_bump_msg(Msg),
183191
handle_credits(State);
@@ -402,10 +410,11 @@ handle_credits(State0) ->
402410
{[{active, Active}], State, hibernate}.
403411

404412
control_throttle(State = #state{connection_state = ConnState,
405-
conserve = Conserve,
413+
blocked_by = BlockedBy,
406414
proc_state = PState,
407415
keepalive = KState
408416
}) ->
417+
Conserve = not sets:is_empty(BlockedBy),
409418
Throttle = case PState of
410419
connect_packet_unprocessed -> Conserve;
411420
_ -> rabbit_mqtt_processor:throttle(Conserve, PState)

0 commit comments

Comments
 (0)