From 3147a0c4ab02b4d9da6b866bd7726317a900bc3b Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 3 Oct 2024 12:15:29 +0100 Subject: [PATCH] QQ: fix bug with discards using a consumer_id() Fixes a pattern matching bug for discards that come in after a consumer has been cancelled. Because the rabbit_fifo_client does not keep the integer consumer key after cancellation, late acks, returns, and discards use the full {CTag, Pid} consumer id version. As this is a state machine change the machine version has been increased to 5. The same bug is present for the `modify` command also however as AMQP does not allow late settlements we don't have to make this fix conditional on the machine version as it cannot happen. (cherry picked from commit 2339401abe44abdc220c2d4bdb5598800812205a) --- deps/rabbit/src/rabbit_fifo.erl | 39 +++++++++++++----- deps/rabbit/test/quorum_queue_SUITE.erl | 32 +++++++++++++++ deps/rabbit/test/rabbit_fifo_SUITE.erl | 53 +++++++------------------ 3 files changed, 75 insertions(+), 49 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 1960eaf03a65..b0f0a43967fb 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -265,15 +265,27 @@ apply(Meta, #settle{msg_ids = MsgIds, _ -> {State, ok} end; -apply(Meta, #discard{consumer_key = ConsumerKey, - msg_ids = MsgIds}, +apply(#{machine_version := 4} = Meta, + #discard{consumer_key = ConsumerKey, + msg_ids = MsgIds}, #?STATE{consumers = Consumers } = State0) -> + %% buggy version that would have not found the consumer if the ConsumerKey + %% was a consumer_id() case find_consumer(ConsumerKey, Consumers) of {ConsumerKey, #consumer{} = Con} -> discard(Meta, MsgIds, ConsumerKey, Con, true, #{}, State0); _ -> {State0, ok} end; +apply(Meta, #discard{consumer_key = ConsumerKey, + msg_ids = MsgIds}, + #?STATE{consumers = Consumers } = State0) -> + case find_consumer(ConsumerKey, Consumers) of + {ActualConsumerKey, #consumer{} = Con} -> + discard(Meta, MsgIds, ActualConsumerKey, Con, true, #{}, State0); + _ -> + {State0, ok} + end; apply(Meta, #return{consumer_key = ConsumerKey, msg_ids = MsgIds}, #?STATE{consumers = Cons} = State) -> @@ -291,13 +303,14 @@ apply(Meta, #modify{consumer_key = ConsumerKey, msg_ids = MsgIds}, #?STATE{consumers = Cons} = State) -> case find_consumer(ConsumerKey, Cons) of - {ConsumerKey, #consumer{checked_out = Checked}} + {ActualConsumerKey, #consumer{checked_out = Checked}} when Undel == false -> - return(Meta, ConsumerKey, MsgIds, DelFailed, + return(Meta, ActualConsumerKey, MsgIds, DelFailed, Anns, Checked, [], State); - {ConsumerKey, #consumer{} = Con} + {ActualConsumerKey, #consumer{} = Con} when Undel == true -> - discard(Meta, MsgIds, ConsumerKey, Con, DelFailed, Anns, State); + discard(Meta, MsgIds, ActualConsumerKey, + Con, DelFailed, Anns, State); _ -> {State, ok} end; @@ -898,13 +911,14 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) -> end. -spec version() -> pos_integer(). -version() -> 4. +version() -> 5. which_module(0) -> rabbit_fifo_v0; which_module(1) -> rabbit_fifo_v1; which_module(2) -> rabbit_fifo_v3; which_module(3) -> rabbit_fifo_v3; -which_module(4) -> ?MODULE. +which_module(4) -> ?MODULE; +which_module(5) -> ?MODULE. -define(AUX, aux_v3). @@ -2520,7 +2534,7 @@ make_checkout({_, _} = ConsumerId, Spec0, Meta) -> make_settle(ConsumerKey, MsgIds) when is_list(MsgIds) -> #settle{consumer_key = ConsumerKey, msg_ids = MsgIds}. --spec make_return(consumer_id(), [msg_id()]) -> protocol(). +-spec make_return(consumer_key(), [msg_id()]) -> protocol(). make_return(ConsumerKey, MsgIds) -> #return{consumer_key = ConsumerKey, msg_ids = MsgIds}. @@ -2528,7 +2542,7 @@ make_return(ConsumerKey, MsgIds) -> is_return(Command) -> is_record(Command, return). --spec make_discard(consumer_id(), [msg_id()]) -> protocol(). +-spec make_discard(consumer_key(), [msg_id()]) -> protocol(). make_discard(ConsumerKey, MsgIds) -> #discard{consumer_key = ConsumerKey, msg_ids = MsgIds}. @@ -2701,7 +2715,10 @@ convert(Meta, 1, To, State) -> convert(Meta, 2, To, State) -> convert(Meta, 3, To, rabbit_fifo_v3:convert_v2_to_v3(State)); convert(Meta, 3, To, State) -> - convert(Meta, 4, To, convert_v3_to_v4(Meta, State)). + convert(Meta, 4, To, convert_v3_to_v4(Meta, State)); +convert(Meta, 4, To, State) -> + %% no conversion needed, this version only includes a logic change + convert(Meta, 5, To, State). smallest_raft_index(#?STATE{messages = Messages, ra_indexes = Indexes, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index c54c27d75cc3..221177e3319d 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -174,6 +174,7 @@ all_tests() -> per_message_ttl_expiration_too_high, consumer_priorities, cancel_consumer_gh_3729, + cancel_consumer_gh_12424, cancel_and_consume_with_same_tag, validate_messages_on_queue, amqpl_headers, @@ -3602,6 +3603,37 @@ cancel_consumer_gh_3729(Config) -> ok = rabbit_ct_client_helpers:close_channel(Ch). +cancel_consumer_gh_12424(Config) -> + QQ = ?config(queue_name, Config), + + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + ExpectedDeclareRslt0 = #'queue.declare_ok'{queue = QQ, message_count = 0, consumer_count = 0}, + DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0), + + ok = publish(Ch, QQ), + + ok = subscribe(Ch, QQ, false), + + DeliveryTag = receive + {#'basic.deliver'{delivery_tag = DT}, _} -> + DT + after 5000 -> + flush(100), + ct:fail("basic.deliver timeout") + end, + + ok = cancel(Ch), + + R = #'basic.reject'{delivery_tag = DeliveryTag, requeue = false}, + ok = amqp_channel:cast(Ch, R), + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + + ok. + + %% Test the scenario where a message is published to a quorum queue cancel_and_consume_with_same_tag(Config) -> %% https://github.com/rabbitmq/rabbitmq-server/issues/5927 QQ = ?config(queue_name, Config), diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 8d45aecca10f..e14b9406eee8 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -42,12 +42,12 @@ groups() -> ]. init_per_group(tests, Config) -> - [{machine_version, 4} | Config]; + [{machine_version, 5} | Config]; init_per_group(machine_version_conversion, Config) -> Config. init_per_testcase(_Testcase, Config) -> - FF = ?config(machine_version, Config) == 4, + FF = ?config(machine_version, Config) == 5, ok = meck:new(rabbit_feature_flags, [passthrough]), meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> FF end), Config. @@ -804,6 +804,19 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(Config) -> ok. +discard_after_cancel_test(Config) -> + Cid = {?FUNCTION_NAME_B, self()}, + {State0, _} = enq(Config, 1, 1, first, test_init(test)), + {State1, #{key := _CKey, + next_msg_id := MsgId}, _Effects1} = + checkout(Config, ?LINE, Cid, 10, State0), + {State2, _, _} = apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + {State, _, _} = apply(meta(Config, ?LINE), + rabbit_fifo:make_discard(Cid, [MsgId]), State2), + ct:pal("State ~p", [State]), + ok. + enqueued_msg_with_delivery_count_test(Config) -> State00 = init(#{name => test, queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), @@ -2786,45 +2799,9 @@ modify_test(Config) -> ok. -ttb_test(Config) -> - S0 = init(#{name => ?FUNCTION_NAME, - queue_resource => - rabbit_misc:r("/", queue, ?FUNCTION_NAME_B)}), - - - S1 = do_n(5_000_000, - fun (N, Acc) -> - I = (5_000_000 - N), - element(1, enq(Config, I, I, ?FUNCTION_NAME_B, Acc)) - end, S0), - - - - {T1, _Res} = timer:tc(fun () -> - do_n(100, fun (_, S) -> - term_to_binary(S), - S1 end, S1) - end), - ct:pal("T1 took ~bus", [T1]), - - - {T2, _} = timer:tc(fun () -> - do_n(100, fun (_, S) -> term_to_iovec(S), S1 end, S1) - end), - ct:pal("T2 took ~bus", [T2]), - - ok. - %% Utility %% -do_n(0, _, A) -> - A; -do_n(N, Fun, A0) -> - A = Fun(N, A0), - do_n(N-1, Fun, A). - - init(Conf) -> rabbit_fifo:init(Conf). make_register_enqueuer(Pid) -> rabbit_fifo:make_register_enqueuer(Pid). apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).