Skip to content

Commit 36972c3

Browse files
Merge pull request #1998 from rabbitmq/consumer_timeouts_take_2
Always close channel on consumer timeout
2 parents 3976d87 + 1bdd460 commit 36972c3

File tree

3 files changed

+23
-61
lines changed

3 files changed

+23
-61
lines changed

src/rabbit_channel.erl

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) ->
27122712
end.
27132713

27142714
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
2715-
capabilities = Capabilities,
27162715
consumer_timeout = Timeout},
2717-
queue_names = QNames,
2718-
queue_consumers = QCons,
27192716
unacked_message_q = UAMQ}) ->
27202717
Now = os:system_time(millisecond),
27212718
case ?QUEUE:peek(UAMQ) of
2722-
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
2719+
{value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}}
27232720
when is_integer(Timeout)
27242721
andalso Time < Now - Timeout ->
2725-
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
2726-
"waiting on ack",
2727-
[rabbit_data_coercion:to_binary(ConsumerTag),
2728-
Channel]),
2729-
SupportsCancel = case rabbit_misc:table_lookup(
2730-
Capabilities,
2731-
<<"consumer_cancel_notify">>) of
2732-
{bool, true} when is_binary(ConsumerTag) ->
2733-
true;
2734-
_ -> false
2735-
end,
2736-
case SupportsCancel of
2737-
false ->
2738-
Ex = rabbit_misc:amqp_error(precondition_failed,
2739-
"consumer ack timed out on channel ~w",
2740-
[Channel], none),
2741-
handle_exception(Ex, State0);
2742-
true ->
2743-
QRef = qpid_to_ref(QPid),
2744-
QName = maps:get(QRef, QNames),
2745-
%% cancel the consumer with the client
2746-
State2 = cancel_consumer(ConsumerTag, QName, State0),
2747-
[Q] = rabbit_amqqueue:lookup([QName]),
2748-
%% send basic cancel to the queue
2749-
{ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
2750-
Q, self(), ConsumerTag, undefined,
2751-
<<"broker">>, State2#ch.queue_states),
2752-
%% return all in-flight messages for the consumer
2753-
{MsgIds, Rem} = lists:foldl(
2754-
fun({_DelTag, ConTag, _Time, {_, MsgId}},
2755-
{Ids, Rem}) when ConTag == ConsumerTag ->
2756-
{[MsgId | Ids], Rem};
2757-
(Unacked, {Ids, Rem}) ->
2758-
{Ids, ?QUEUE:in(Unacked, Rem)}
2759-
end, {[], ?QUEUE:new()},
2760-
?QUEUE:to_list(UAMQ)),
2761-
QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
2762-
self(), QueueStates2),
2763-
{noreply, State2#ch{queue_states = QueueStates,
2764-
queue_consumers = maps:remove(QRef, QCons),
2765-
unacked_message_q = Rem}}
2766-
end;
2722+
rabbit_log_channel:warning("Consumer ~s on channel ~w has timed out "
2723+
"waiting on consumer acknowledgement. Timeout used: ~p ms",
2724+
[rabbit_data_coercion:to_binary(ConsumerTag),
2725+
Channel, Timeout]),
2726+
Ex = rabbit_misc:amqp_error(precondition_failed,
2727+
"consumer ack timed out on channel ~w",
2728+
[Channel], none),
2729+
handle_exception(Ex, State0);
27672730
_ ->
27682731
{noreply, State0}
27692732
end.

test/consumer_timeout_SUITE.erl

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -129,26 +129,26 @@ end_per_testcase(Testcase, Config) ->
129129
rabbit_ct_helpers:testcase_finished(Config, Testcase).
130130

131131
consumer_timeout(Config) ->
132-
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
132+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
133133
QName = ?config(queue_name, Config),
134134
declare_queue(Ch, Config, QName),
135135
publish(Ch, QName, [<<"msg1">>]),
136136
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
137137
subscribe(Ch, QName, false),
138+
erlang:monitor(process, Conn),
139+
erlang:monitor(process, Ch),
138140
receive
139-
{#'basic.deliver'{delivery_tag = _,
140-
redelivered = false}, _} ->
141-
%% do nothing with the delivery should trigger timeout
142-
receive
143-
#'basic.cancel'{ } ->
144-
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
145-
ok
146-
after 20000 ->
147-
flush(1),
148-
exit(cancel_never_happened)
149-
end
150-
after 5000 ->
151-
exit(deliver_timeout)
141+
{'DOWN', _, process, Ch, _} -> ok
142+
after 30000 ->
143+
flush(1),
144+
exit(channel_exit_expected)
145+
end,
146+
receive
147+
{'DOWN', _, process, Conn, _} ->
148+
flush(1),
149+
exit(unexpected_connection_exit)
150+
after 2000 ->
151+
ok
152152
end,
153153
rabbit_ct_client_helpers:close_channel(Ch),
154154
ok.

test/unit_inbroker_non_parallel_SUITE.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,6 @@ log_file_fails_to_initialise_during_startup(Config) ->
315315

316316
log_file_fails_to_initialise_during_startup1(_Config) ->
317317
[LogFile|_] = rabbit:log_locations(),
318-
Suffix = ".0",
319318

320319
%% start application with logging to directory with no
321320
%% write permissions

0 commit comments

Comments
 (0)