Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,16 @@ apply(#{index := Index,
Effects2 = [reply_log_effect(RaftIdx, MsgId, Header,
messages_ready(State4), From)
| Effects1],
{State, _DroppedMsg, Effects} =
evaluate_limit(Index, false, State0, State4, Effects2),
{State, Effects} = evaluate_limit(Index, State0,
State4, Effects2),
{State, '$ra_no_reply', Effects};
{nochange, _ExpiredMsg = true, State2, Effects0} ->
%% All ready messages expired.
State3 = State2#?STATE{consumers =
maps:remove(ConsumerId,
State2#?STATE.consumers)},
{State, _, Effects} = evaluate_limit(Index, false, State0,
State3, Effects0),
{State, Effects} = evaluate_limit(Index, State0,
State3, Effects0),
{State, {dequeue, empty}, Effects}
end
end;
Expand Down Expand Up @@ -515,8 +515,7 @@ apply(#{index := Index}, #purge{},
},
Effects0 = [{aux, force_checkpoint}, garbage_collection],
Reply = {purge, NumReady},
{State, _, Effects} = evaluate_limit(Index, false, State0,
State1, Effects0),
{State, Effects} = evaluate_limit(Index, State0, State1, Effects0),
{State, Reply, Effects};
apply(#{index := _Idx}, #garbage_collection{}, State) ->
{State, ok, [{aux, garbage_collection}]};
Expand Down Expand Up @@ -1662,7 +1661,6 @@ combine_effects([{mod_call,
combine_effects(New, Old) ->
New ++ Old.


maybe_set_msg_ttl(Msg, RaCmdTs, Header,
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
case mc:is(Msg) of
Expand Down Expand Up @@ -1994,10 +1992,8 @@ checkout(#{index := Index} = Meta,
State2 = State1#?STATE{msg_cache = undefined,
dlx = DlxState},
Effects2 = DlxDeliveryEffects ++ Effects1,
case evaluate_limit(Index, false, OldState, State2, Effects2) of
{State, _, Effects} ->
{State, Reply, Effects}
end.
{State, Effects} = evaluate_limit(Index, OldState, State2, Effects2),
{State, Reply, Effects}.

checkout0(Meta, {success, ConsumerKey, MsgId,
?MSG(_, _) = Msg, ExpiredMsg, State, Effects},
Expand All @@ -2014,29 +2010,37 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
Effects = add_delivery_effects(Effects0, SendAcc, State0),
{State0, ExpiredMsg, lists:reverse(Effects)}.

evaluate_limit(_Index, Result,
#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}},
#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
evaluate_limit(_Index, Result, _BeforeState,
#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined},
enqueuers = Enqs0} = State0,
Effects0) ->
evaluate_limit(Idx, State1, State2, OuterEffects) ->
case evaluate_limit0(Idx, State1, State2, []) of
{State, []} ->
{State, OuterEffects};
{State, Effects} ->
{State, OuterEffects ++ lists:reverse(Effects)}
end.

evaluate_limit0(_Index,
#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}},
#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Effects};
evaluate_limit0(_Index, _BeforeState,
#?STATE{cfg = #cfg{max_length = undefined,
max_bytes = undefined},
enqueuers = Enqs0} = State0,
Effects0) ->
%% max_length and/or max_bytes policies have just been deleted
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
evaluate_limit(Index, Result, BeforeState,
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
enqueuers = Enqs0} = State0,
Effects0) ->
{State0#?STATE{enqueuers = Enqs}, Effects};
evaluate_limit0(Index, BeforeState,
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
enqueuers = Enqs0} = State0,
Effects0) ->
case is_over_limit(State0) of
true when Strategy == drop_head ->
{State, Effects} = drop_head(State0, Effects0),
evaluate_limit(Index, true, BeforeState, State, Effects);
evaluate_limit0(Index, BeforeState, State, Effects);
true when Strategy == reject_publish ->
%% generate send_msg effect for each enqueuer to let them know
%% they need to block
Expand All @@ -2050,7 +2054,7 @@ evaluate_limit(Index, Result, BeforeState,
(_P, _E, Acc) ->
Acc
end, {Enqs0, Effects0}, Enqs0),
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
{State0#?STATE{enqueuers = Enqs}, Effects};
false when Strategy == reject_publish ->
%% TODO: optimise as this case gets called for every command
%% pretty much
Expand All @@ -2059,12 +2063,12 @@ evaluate_limit(Index, Result, BeforeState,
{false, true} ->
%% we have moved below the lower limit
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
{State0#?STATE{enqueuers = Enqs}, Effects};
_ ->
{State0, Result, Effects0}
{State0, Effects0}
end;
false ->
{State0, Result, Effects0}
{State0, Effects0}
end.

unblock_enqueuers(Enqs0, Effects0) ->
Expand Down
45 changes: 45 additions & 0 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ all_tests() ->
subscribe_redelivery_count,
message_bytes_metrics,
queue_length_limit_drop_head,
queue_length_bytes_limit_drop_head,
queue_length_limit_reject_publish,
queue_length_limit_policy_cleared,
subscribe_redelivery_limit,
Expand Down Expand Up @@ -3669,6 +3670,50 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).

queue_length_bytes_limit_drop_head(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
DLQ = <<"dead letter queue">>,

?assertEqual({'queue.declare_ok', DLQ, 0, 0},
declare(Ch, DLQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, <<"drop-head">>},
{<<"x-max-length-bytes">>, long, 1000},
{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DLQ}])),

LargePayload = binary:copy(<<"x">>, 1500),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m1">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = <<"m2">>}),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{payload = LargePayload}),
wait_for_consensus(QQ, Config),
wait_for_consensus(DLQ, Config),
RaName = ra_name(DLQ),
wait_for_messages_ready(Servers, RaName, 3),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = LargePayload}},
amqp_channel:call(Ch, #'basic.get'{queue = DLQ,
no_ack = true})),

[?assertEqual(#'queue.delete_ok'{message_count = 0},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- [QQ, DLQ]].

queue_length_limit_reject_publish(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

Expand Down
11 changes: 11 additions & 0 deletions release-notes/4.2.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## RabbitMQ 4.2.1

RabbitMQ `4.2.1` is a maintenance release in the `4.2.x` [release series](https://www.rabbitmq.com/release-information).

### Core Server

#### Bug Fixes

* Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order.

GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926)
6 changes: 6 additions & 0 deletions release-notes/4.3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ compared to other versions.

### Core Server

#### Bug Fixes

* Quorum queue at-most-once dead lettering for the overflow behaviour `drop-head` now happens in the correct order.

GitHub issue: [#14926](https://github.com/rabbitmq/rabbitmq-server/pull/14926)

#### Enhancements

TBD
Expand Down
Loading