Skip to content

Commit 3147a0c

Browse files
kjnilssonmergify[bot]
authored andcommitted
QQ: fix bug with discards using a consumer_id()
Fixes a pattern matching bug for discards that come in after a consumer has been cancelled. Because the rabbit_fifo_client does not keep the integer consumer key after cancellation, late acks, returns, and discards use the full {CTag, Pid} consumer id version. As this is a state machine change the machine version has been increased to 5. The same bug is present for the `modify` command also however as AMQP does not allow late settlements we don't have to make this fix conditional on the machine version as it cannot happen. (cherry picked from commit 2339401)
1 parent 44a4e8e commit 3147a0c

File tree

3 files changed

+75
-49
lines changed

3 files changed

+75
-49
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,27 @@ apply(Meta, #settle{msg_ids = MsgIds,
265265
_ ->
266266
{State, ok}
267267
end;
268-
apply(Meta, #discard{consumer_key = ConsumerKey,
269-
msg_ids = MsgIds},
268+
apply(#{machine_version := 4} = Meta,
269+
#discard{consumer_key = ConsumerKey,
270+
msg_ids = MsgIds},
270271
#?STATE{consumers = Consumers } = State0) ->
272+
%% buggy version that would have not found the consumer if the ConsumerKey
273+
%% was a consumer_id()
271274
case find_consumer(ConsumerKey, Consumers) of
272275
{ConsumerKey, #consumer{} = Con} ->
273276
discard(Meta, MsgIds, ConsumerKey, Con, true, #{}, State0);
274277
_ ->
275278
{State0, ok}
276279
end;
280+
apply(Meta, #discard{consumer_key = ConsumerKey,
281+
msg_ids = MsgIds},
282+
#?STATE{consumers = Consumers } = State0) ->
283+
case find_consumer(ConsumerKey, Consumers) of
284+
{ActualConsumerKey, #consumer{} = Con} ->
285+
discard(Meta, MsgIds, ActualConsumerKey, Con, true, #{}, State0);
286+
_ ->
287+
{State0, ok}
288+
end;
277289
apply(Meta, #return{consumer_key = ConsumerKey,
278290
msg_ids = MsgIds},
279291
#?STATE{consumers = Cons} = State) ->
@@ -291,13 +303,14 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
291303
msg_ids = MsgIds},
292304
#?STATE{consumers = Cons} = State) ->
293305
case find_consumer(ConsumerKey, Cons) of
294-
{ConsumerKey, #consumer{checked_out = Checked}}
306+
{ActualConsumerKey, #consumer{checked_out = Checked}}
295307
when Undel == false ->
296-
return(Meta, ConsumerKey, MsgIds, DelFailed,
308+
return(Meta, ActualConsumerKey, MsgIds, DelFailed,
297309
Anns, Checked, [], State);
298-
{ConsumerKey, #consumer{} = Con}
310+
{ActualConsumerKey, #consumer{} = Con}
299311
when Undel == true ->
300-
discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State);
312+
discard(Meta, MsgIds, ActualConsumerKey,
313+
Con, DelFailed, Anns, State);
301314
_ ->
302315
{State, ok}
303316
end;
@@ -898,13 +911,14 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
898911
end.
899912

900913
-spec version() -> pos_integer().
901-
version() -> 4.
914+
version() -> 5.
902915

903916
which_module(0) -> rabbit_fifo_v0;
904917
which_module(1) -> rabbit_fifo_v1;
905918
which_module(2) -> rabbit_fifo_v3;
906919
which_module(3) -> rabbit_fifo_v3;
907-
which_module(4) -> ?MODULE.
920+
which_module(4) -> ?MODULE;
921+
which_module(5) -> ?MODULE.
908922

909923
-define(AUX, aux_v3).
910924

@@ -2520,15 +2534,15 @@ make_checkout({_, _} = ConsumerId, Spec0, Meta) ->
25202534
make_settle(ConsumerKey, MsgIds) when is_list(MsgIds) ->
25212535
#settle{consumer_key = ConsumerKey, msg_ids = MsgIds}.
25222536

2523-
-spec make_return(consumer_id(), [msg_id()]) -> protocol().
2537+
-spec make_return(consumer_key(), [msg_id()]) -> protocol().
25242538
make_return(ConsumerKey, MsgIds) ->
25252539
#return{consumer_key = ConsumerKey, msg_ids = MsgIds}.
25262540

25272541
-spec is_return(protocol()) -> boolean().
25282542
is_return(Command) ->
25292543
is_record(Command, return).
25302544

2531-
-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
2545+
-spec make_discard(consumer_key(), [msg_id()]) -> protocol().
25322546
make_discard(ConsumerKey, MsgIds) ->
25332547
#discard{consumer_key = ConsumerKey, msg_ids = MsgIds}.
25342548

@@ -2701,7 +2715,10 @@ convert(Meta, 1, To, State) ->
27012715
convert(Meta, 2, To, State) ->
27022716
convert(Meta, 3, To, rabbit_fifo_v3:convert_v2_to_v3(State));
27032717
convert(Meta, 3, To, State) ->
2704-
convert(Meta, 4, To, convert_v3_to_v4(Meta, State)).
2718+
convert(Meta, 4, To, convert_v3_to_v4(Meta, State));
2719+
convert(Meta, 4, To, State) ->
2720+
%% no conversion needed, this version only includes a logic change
2721+
convert(Meta, 5, To, State).
27052722

27062723
smallest_raft_index(#?STATE{messages = Messages,
27072724
ra_indexes = Indexes,

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ all_tests() ->
174174
per_message_ttl_expiration_too_high,
175175
consumer_priorities,
176176
cancel_consumer_gh_3729,
177+
cancel_consumer_gh_12424,
177178
cancel_and_consume_with_same_tag,
178179
validate_messages_on_queue,
179180
amqpl_headers,
@@ -3602,6 +3603,37 @@ cancel_consumer_gh_3729(Config) ->
36023603

36033604
ok = rabbit_ct_client_helpers:close_channel(Ch).
36043605

3606+
cancel_consumer_gh_12424(Config) ->
3607+
QQ = ?config(queue_name, Config),
3608+
3609+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
3610+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
3611+
3612+
ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0},
3613+
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
3614+
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),
3615+
3616+
ok = publish(Ch, QQ),
3617+
3618+
ok = subscribe(Ch, QQ, false),
3619+
3620+
DeliveryTag = receive
3621+
{#'basic.deliver'{delivery_tag = DT}, _} ->
3622+
DT
3623+
after 5000 ->
3624+
flush(100),
3625+
ct:fail("basic.deliver timeout")
3626+
end,
3627+
3628+
ok = cancel(Ch),
3629+
3630+
R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = false},
3631+
ok = amqp_channel:cast(Ch, R),
3632+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
3633+
3634+
ok.
3635+
3636+
%% Test the scenario where a message is published to a quorum queue
36053637
cancel_and_consume_with_same_tag(Config) ->
36063638
%% https://github.com/rabbitmq/rabbitmq-server/issues/5927
36073639
QQ = ?config(queue_name, Config),

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ groups() ->
4242
].
4343

4444
init_per_group(tests, Config) ->
45-
[{machine_version, 4} | Config];
45+
[{machine_version, 5} | Config];
4646
init_per_group(machine_version_conversion, Config) ->
4747
Config.
4848

4949
init_per_testcase(_Testcase, Config) ->
50-
FF = ?config(machine_version, Config) == 4,
50+
FF = ?config(machine_version, Config) == 5,
5151
ok = meck:new(rabbit_feature_flags, [passthrough]),
5252
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end),
5353
Config.
@@ -804,6 +804,19 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(Config) ->
804804

805805
ok.
806806

807+
discard_after_cancel_test(Config) ->
808+
Cid = {?FUNCTION_NAME_B, self()},
809+
{State0, _} = enq(Config, 1, 1, first, test_init(test)),
810+
{State1, #{key := _CKey,
811+
next_msg_id := MsgId}, _Effects1} =
812+
checkout(Config, ?LINE, Cid, 10, State0),
813+
{State2, _, _} = apply(meta(Config, ?LINE),
814+
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
815+
{State, _, _} = apply(meta(Config, ?LINE),
816+
rabbit_fifo:make_discard(Cid, [MsgId]), State2),
817+
ct:pal("State ~p", [State]),
818+
ok.
819+
807820
enqueued_msg_with_delivery_count_test(Config) ->
808821
State00 = init(#{name => test,
809822
queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
@@ -2786,45 +2799,9 @@ modify_test(Config) ->
27862799

27872800
ok.
27882801

2789-
ttb_test(Config) ->
2790-
S0 = init(#{name => ?FUNCTION_NAME,
2791-
queue_resource =>
2792-
rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}),
2793-
2794-
2795-
S1 = do_n(5_000_000,
2796-
fun (N, Acc) ->
2797-
I = (5_000_000 - N),
2798-
element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc))
2799-
end, S0),
2800-
2801-
2802-
2803-
{T1, _Res} = timer:tc(fun () ->
2804-
do_n(100, fun (_, S) ->
2805-
term_to_binary(S),
2806-
S1 end, S1)
2807-
end),
2808-
ct:pal("T1 took ~bus", [T1]),
2809-
2810-
2811-
{T2, _} = timer:tc(fun () ->
2812-
do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1)
2813-
end),
2814-
ct:pal("T2 took ~bus", [T2]),
2815-
2816-
ok.
2817-
28182802
%% Utility
28192803
%%
28202804

2821-
do_n(0, _, A) ->
2822-
A;
2823-
do_n(N, Fun, A0) ->
2824-
A = Fun(N, A0),
2825-
do_n(N-1, Fun, A).
2826-
2827-
28282805
init(Conf) -> rabbit_fifo:init(Conf).
28292806
make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid).
28302807
apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).

0 commit comments

Comments
 (0)