diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 0c195b04ab2..a43371cc816 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -411,16 +411,16 @@ apply(#{index := Index, Effects2 = [reply_log_effect(RaftIdx, MsgId, Header, messages_ready(State4), From) | Effects1], - {State, _DroppedMsg, Effects} = - evaluate_limit(Index, false, State0, State4, Effects2), + {State, Effects} = evaluate_limit(Index, State0, + State4, Effects2), {State, '$ra_no_reply', Effects}; {nochange, _ExpiredMsg = true, State2, Effects0} -> %% All ready messages expired. State3 = State2#?STATE{consumers = maps:remove(ConsumerId, State2#?STATE.consumers)}, - {State, _, Effects} = evaluate_limit(Index, false, State0, - State3, Effects0), + {State, Effects} = evaluate_limit(Index, State0, + State3, Effects0), {State, {dequeue, empty}, Effects} end end; @@ -515,8 +515,7 @@ apply(#{index := Index}, #purge{}, }, Effects0 = [{aux, force_checkpoint}, garbage_collection], Reply = {purge, NumReady}, - {State, _, Effects} = evaluate_limit(Index, false, State0, - State1, Effects0), + {State, Effects} = evaluate_limit(Index, State0, State1, Effects0), {State, Reply, Effects}; apply(#{index := _Idx}, #garbage_collection{}, State) -> {State, ok, [{aux, garbage_collection}]}; @@ -1662,7 +1661,6 @@ combine_effects([{mod_call, combine_effects(New, Old) -> New ++ Old. - maybe_set_msg_ttl(Msg, RaCmdTs, Header, #?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) -> case mc:is(Msg) of @@ -1994,10 +1992,8 @@ checkout(#{index := Index} = Meta, State2 = State1#?STATE{msg_cache = undefined, dlx = DlxState}, Effects2 = DlxDeliveryEffects ++ Effects1, - case evaluate_limit(Index, false, OldState, State2, Effects2) of - {State, _, Effects} -> - {State, Reply, Effects} - end. + {State, Effects} = evaluate_limit(Index, OldState, State2, Effects2), + {State, Reply, Effects}. checkout0(Meta, {success, ConsumerKey, MsgId, ?MSG(_, _) = Msg, ExpiredMsg, State, Effects}, @@ -2014,29 +2010,37 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) -> Effects = add_delivery_effects(Effects0, SendAcc, State0), {State0, ExpiredMsg, lists:reverse(Effects)}. -evaluate_limit(_Index, Result, - #?STATE{cfg = #cfg{max_length = undefined, - max_bytes = undefined}}, - #?STATE{cfg = #cfg{max_length = undefined, - max_bytes = undefined}} = State, - Effects) -> - {State, Result, Effects}; -evaluate_limit(_Index, Result, _BeforeState, - #?STATE{cfg = #cfg{max_length = undefined, - max_bytes = undefined}, - enqueuers = Enqs0} = State0, - Effects0) -> +evaluate_limit(Idx, State1, State2, OuterEffects) -> + case evaluate_limit0(Idx, State1, State2, []) of + {State, []} -> + {State, OuterEffects}; + {State, Effects} -> + {State, OuterEffects ++ lists:reverse(Effects)} + end. + +evaluate_limit0(_Index, + #?STATE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}}, + #?STATE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}} = State, + Effects) -> + {State, Effects}; +evaluate_limit0(_Index, _BeforeState, + #?STATE{cfg = #cfg{max_length = undefined, + max_bytes = undefined}, + enqueuers = Enqs0} = State0, + Effects0) -> %% max_length and/or max_bytes policies have just been deleted {Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0), - {State0#?STATE{enqueuers = Enqs}, Result, Effects}; -evaluate_limit(Index, Result, BeforeState, - #?STATE{cfg = #cfg{overflow_strategy = Strategy}, - enqueuers = Enqs0} = State0, - Effects0) -> + {State0#?STATE{enqueuers = Enqs}, Effects}; +evaluate_limit0(Index, BeforeState, + #?STATE{cfg = #cfg{overflow_strategy = Strategy}, + enqueuers = Enqs0} = State0, + Effects0) -> case is_over_limit(State0) of true when Strategy == drop_head -> {State, Effects} = drop_head(State0, Effects0), - evaluate_limit(Index, true, BeforeState, State, Effects); + evaluate_limit0(Index, BeforeState, State, Effects); true when Strategy == reject_publish -> %% generate send_msg effect for each enqueuer to let them know %% they need to block @@ -2050,7 +2054,7 @@ evaluate_limit(Index, Result, BeforeState, (_P, _E, Acc) -> Acc end, {Enqs0, Effects0}, Enqs0), - {State0#?STATE{enqueuers = Enqs}, Result, Effects}; + {State0#?STATE{enqueuers = Enqs}, Effects}; false when Strategy == reject_publish -> %% TODO: optimise as this case gets called for every command %% pretty much @@ -2059,12 +2063,12 @@ evaluate_limit(Index, Result, BeforeState, {false, true} -> %% we have moved below the lower limit {Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0), - {State0#?STATE{enqueuers = Enqs}, Result, Effects}; + {State0#?STATE{enqueuers = Enqs}, Effects}; _ -> - {State0, Result, Effects0} + {State0, Effects0} end; false -> - {State0, Result, Effects0} + {State0, Effects0} end. unblock_enqueuers(Enqs0, Effects0) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1a3fed31227..d9da7ac55fa 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -167,6 +167,7 @@ all_tests() -> subscribe_redelivery_count, message_bytes_metrics, queue_length_limit_drop_head, + queue_length_bytes_limit_drop_head, queue_length_limit_reject_publish, queue_length_limit_policy_cleared, subscribe_redelivery_limit, @@ -3697,6 +3698,50 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_bytes_limit_drop_head(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLQ = <<"dead letter queue">>, + + ?assertEqual({'queue.declare_ok', DLQ, 0, 0}, + declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-overflow">>, longstr, <<"drop-head">>}, + {<<"x-max-length-bytes">>, long, 1000}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLQ}])), + + LargePayload = binary:copy(<<"x">>, 1500), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m1">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = <<"m2">>}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{payload = LargePayload}), + wait_for_consensus(QQ, Config), + wait_for_consensus(DLQ, Config), + RaName = ra_name(DLQ), + wait_for_messages_ready(Servers, RaName, 3), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}}, + amqp_channel:call(Ch, #'basic.get'{queue = DLQ, + no_ack = true})), + + [?assertEqual(#'queue.delete_ok'{message_count = 0}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})) + || Q <- [QQ, DLQ]]. + queue_length_limit_reject_publish(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/release-notes/4.2.1.md b/release-notes/4.2.1.md new file mode 100644 index 00000000000..f005d9c03a0 --- /dev/null +++ b/release-notes/4.2.1.md @@ -0,0 +1,11 @@ +## RabbitMQ 4.2.1 + +RabbitMQ `4.2.1` is a maintenance release in the `4.2.x` [release series](https://www.rabbitmq.com/release-information). + +### Core Server + +#### Bug Fixes + + * Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order. + + GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926)