Skip to content

Commit 20c4b80

Browse files
committed
Fix channel crash when QQ is deleted while a message is published to it
There is a race condition that when a QQ shuts down while a channel tries to deliver a message to it, the `gen_statem:call` exits. This exit is caught in `rabbit_queue_type:deliver/4`, but not handled and resulted in a case_clause in`rabbit_channel:delivery_to_queues/3`. This not only resulted in the channel process crashing but also interrupted the delivery, in a fan-out scenario the message was not delivered to queues in the destination list after the shut down queue. With this fix the message is now published to all other destination queues and a rejected publisher confirm response is sent. The fix was adapted from PR rabbitmq#12713 commit f2b1f37 (which relies on a change in `ra` 2.16.0) included in RabbitMQ 4.1.0. Partially fixes rabbitmq#10344
1 parent ef0e0e1 commit 20c4b80

File tree

2 files changed

+183
-1
lines changed

2 files changed

+183
-1
lines changed

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ enqueue(QName, Correlation, Msg,
119119
timeout = Timeout}} = State0) ->
120120
%% the first publish, register and enqueuer for this process.
121121
Reg = rabbit_fifo:make_register_enqueuer(self()),
122-
case ra:process_command(Servers, Reg, Timeout) of
122+
try ra:process_command(Servers, Reg, Timeout) of
123123
{ok, reject_publish, Leader} ->
124124
{reject_publish, State0#state{leader = Leader,
125125
queue_status = reject_publish}};
@@ -134,7 +134,14 @@ enqueue(QName, Correlation, Msg,
134134
{timeout, _} ->
135135
{reject_publish, State0};
136136
Err ->
137+
rabbit_log:debug("~ts: QQ ~ts error when registering enqueuer ~p",
138+
[?MODULE, rabbit_misc:rs(QName), Err]),
137139
exit(Err)
140+
catch
141+
exit:{{shutdown, delete}, _} ->
142+
rabbit_log:debug("~ts: QQ ~ts tried to register enqueuer during delete shutdown",
143+
[?MODULE, rabbit_misc:rs(QName)]),
144+
{reject_publish, State0}
138145
end;
139146
enqueue(_QName, _Correlation, _Msg,
140147
#state{queue_status = reject_publish,
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
-module(handling_crash_during_fanout_SUITE).
2+
3+
-include_lib("eunit/include/eunit.hrl").
4+
-include_lib("amqp_client/include/amqp_client.hrl").
5+
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
6+
7+
-export([suite/0,
8+
all/0,
9+
init_per_suite/1,
10+
end_per_suite/1,
11+
init_per_testcase/2,
12+
end_per_testcase/2]).
13+
14+
-export([test_fanout_crash/1]).
15+
16+
%%%===================================================================
17+
%%% Common Test callbacks
18+
%%%===================================================================
19+
suite() ->
20+
[{timetrap, {minutes, 3}}].
21+
22+
init_per_suite(Config) ->
23+
rabbit_ct_helpers:log_environment(),
24+
rabbit_ct_helpers:run_setup_steps(Config, []).
25+
26+
end_per_suite(Config) ->
27+
rabbit_ct_helpers:run_teardown_steps(Config).
28+
29+
init_per_testcase(Testcase, Config) ->
30+
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
31+
Q = rabbit_data_coercion:to_binary(Testcase),
32+
Config2 = rabbit_ct_helpers:set_config(Config1,
33+
[{rmq_nodes_count, 3},
34+
{rmq_nodename_suffix, Testcase},
35+
{tcp_ports_base},
36+
{queue_prefix, Q}
37+
]),
38+
rabbit_ct_helpers:run_steps(
39+
Config2,
40+
rabbit_ct_broker_helpers:setup_steps() ++
41+
rabbit_ct_client_helpers:setup_steps()).
42+
43+
end_per_testcase(Testcase, Config) ->
44+
Config1 = rabbit_ct_helpers:run_steps(Config,
45+
rabbit_ct_client_helpers:teardown_steps() ++
46+
rabbit_ct_broker_helpers:teardown_steps()),
47+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
48+
49+
all() ->
50+
[test_fanout_crash].
51+
52+
%%%===================================================================
53+
%%% Test cases
54+
%%%===================================================================
55+
test_fanout_crash(Config) ->
56+
[Server, AuxServer | _] = _Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
57+
QPrefix = proplists:get_value(queue_prefix, Config),
58+
59+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
60+
AuxCh = rabbit_ct_client_helpers:open_channel(Config, AuxServer),
61+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
62+
63+
QueuesCount = 10,
64+
[FirstQueue | _] = Queues = [<<QPrefix/binary,
65+
"_",
66+
(integer_to_binary(N))/binary>>
67+
|| N <- lists:seq(1, QueuesCount)],
68+
69+
[
70+
?assertEqual({'queue.declare_ok', QName, 0, 0},
71+
declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}]))
72+
|| QName <- Queues
73+
],
74+
75+
Fanout = <<"amq.fanout">>,
76+
[
77+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = Fanout,
78+
queue = QName,
79+
routing_key = <<>>})
80+
|| QName <- Queues
81+
],
82+
83+
%% Setup consumers
84+
Consumers = [
85+
setup_consumer(Config, Server, QName, true)
86+
|| QName <- Queues],
87+
88+
%% Suspend the queue process for the first queue
89+
rabbit_ct_broker_helpers:rpc(Config, 0, sys, suspend, [queue_utils:ra_name(FirstQueue)]),
90+
91+
{messages, Msg0} = rabbit_ct_broker_helpers:rpc(Config, 0, recon, info, [queue_utils:ra_name(FirstQueue), messages]),
92+
ct:pal("queue proc messages before delete request ~p", [Msg0]),
93+
94+
%% Delete the first queue (will apply on sys:resume)
95+
amqp_channel:cast(AuxCh, #'queue.delete'{queue = FirstQueue}),
96+
97+
Msg1 =
98+
?awaitMatch(
99+
Msgs11 when length(Msgs11) >= 2 + length(Msg0),
100+
begin
101+
{messages, Msgs11} =
102+
rabbit_ct_broker_helpers:rpc(Config, 0, recon, info, [queue_utils:ra_name(FirstQueue), messages]),
103+
Msgs11
104+
end,
105+
3000),
106+
ct:pal("queue proc messages after delete request ~p", [Msg1]),
107+
108+
%% Publish a message to the fanout exchange
109+
amqp_channel:cast(Ch, #'basic.publish'{exchange=Fanout, routing_key = <<>>},
110+
#amqp_msg{payload = <<"test message">>}),
111+
112+
%% Register test process as confirm handler
113+
amqp_channel:register_confirm_handler(Ch, self()),
114+
115+
%% Resume the queue process for the first queue
116+
rabbit_ct_broker_helpers:rpc(Config, 0, sys, resume, [queue_utils:ra_name(FirstQueue)]),
117+
118+
ConsumersResults = lists:foldl(fun(_Consumer, Acc) ->
119+
receive
120+
{consumer, Msg} -> [Msg | Acc]
121+
after 1000 ->
122+
Acc
123+
end
124+
end, [], Consumers),
125+
126+
?assertMatch({9, _}, {length(ConsumersResults), ConsumersResults}),
127+
128+
?assertEqual(
129+
nack,
130+
receive
131+
#'basic.ack'{} ->
132+
ack;
133+
#'basic.nack'{} ->
134+
nack;
135+
Other ->
136+
Other
137+
after 1000 ->
138+
confirm_timeout
139+
end),
140+
141+
ok.
142+
143+
%%%===================================================================
144+
%%% Utilities
145+
%%%===================================================================
146+
declare(Ch, Q, Args) ->
147+
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
148+
durable = true,
149+
auto_delete = false,
150+
arguments = Args}).
151+
152+
setup_consumer(Config, Server, Queue, NoAck) ->
153+
Parent = self(),
154+
spawn(fun() ->
155+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
156+
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
157+
no_ack = NoAck,
158+
consumer_tag = Queue},
159+
self()),
160+
receive
161+
#'basic.consume_ok'{consumer_tag = _} ->
162+
%% Wait for messages
163+
receive
164+
{#'basic.deliver'{}, _Msg} ->
165+
ct:pal("Message received on queue ~p", [Queue]),
166+
Parent ! {consumer, {message_received, Queue}}
167+
after 5000 ->
168+
ct:pal("No messages received on queue ~p", [Queue]),
169+
Parent ! {consumer, {no_messages_received, Queue}}
170+
end
171+
after 1000 ->
172+
ct:pal("Timeout starting consumer for ~p", [Queue]),
173+
Parent ! {consumer, {consumer_timeout, Queue}}
174+
end
175+
end).

0 commit comments

Comments
 (0)