Skip to content

Commit e81feb5

Browse files
committed
QQ: a delivery-limit of -1 disables the delivery limit.
For cases where users want to live a bit more dangerously this commit maps a delivery limit of -1 (or any negative value) such that it disables the delivery limit and restores the 3.13.x behaviour.
1 parent 61f53e2 commit e81feb5

File tree

9 files changed

+226
-51
lines changed

9 files changed

+226
-51
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,14 @@ update_config(Conf, State) ->
195195
Overflow = maps:get(overflow_strategy, Conf, drop_head),
196196
MaxLength = maps:get(max_length, Conf, undefined),
197197
MaxBytes = maps:get(max_bytes, Conf, undefined),
198-
DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
198+
DeliveryLimit = case maps:get(delivery_limit, Conf, undefined) of
199+
DL when is_number(DL) andalso
200+
DL < 0 ->
201+
undefined;
202+
DL ->
203+
DL
204+
end,
205+
199206
Expires = maps:get(expires, Conf, undefined),
200207
MsgTTL = maps:get(msg_ttl, Conf, undefined),
201208
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
@@ -615,24 +622,25 @@ apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
615622
checkout(Meta, State0, State, Effects);
616623
apply(_, {nodedown, _Node}, State) ->
617624
{State, ok};
618-
apply(#{index := _Idx} = Meta, #purge_nodes{nodes = Nodes}, State0) ->
625+
apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
619626
{State, Effects} = lists:foldl(fun(Node, {S, E}) ->
620627
purge_node(Meta, Node, S, E)
621628
end, {State0, []}, Nodes),
622629
{State, ok, Effects};
623-
apply(#{index := _Idx} = Meta,
624-
#update_config{config = #{dead_letter_handler := NewDLH} = Conf},
630+
apply(Meta,
631+
#update_config{config = #{} = Conf},
625632
#?STATE{cfg = #cfg{dead_letter_handler = OldDLH,
626633
resource = QRes},
627634
dlx = DlxState0} = State0) ->
635+
NewDLH = maps:get(dead_letter_handler, Conf, OldDLH),
628636
{DlxState, Effects0} = rabbit_fifo_dlx:update_config(OldDLH, NewDLH, QRes,
629637
DlxState0),
630638
State1 = update_config(Conf, State0#?STATE{dlx = DlxState}),
631639
checkout(Meta, State0, State1, Effects0);
632640
apply(Meta, {machine_version, FromVersion, ToVersion}, V0State) ->
633641
State = convert(Meta, FromVersion, ToVersion, V0State),
634642
{State, ok, [{aux, {dlx, setup}}]};
635-
apply(#{index := _IncomingRaftIdx} = Meta, {dlx, _} = Cmd,
643+
apply(Meta, {dlx, _} = Cmd,
636644
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
637645
dlx = DlxState0} = State0) ->
638646
{DlxState, Effects0} = rabbit_fifo_dlx:apply(Meta, Cmd, DLH, DlxState0),

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@
222222
max_bytes => non_neg_integer(),
223223
overflow_strategy => drop_head | reject_publish,
224224
single_active_consumer_on => boolean(),
225-
delivery_limit => non_neg_integer(),
225+
delivery_limit => non_neg_integer() | -1,
226226
expires => non_neg_integer(),
227227
msg_ttl => non_neg_integer(),
228228
created => non_neg_integer()

deps/rabbit/src/rabbit_policies.erl

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ validate_policy0(<<"overflow">>, Value) ->
165165
{error, "~tp is not a valid overflow value", [Value]};
166166

167167
validate_policy0(<<"delivery-limit">>, Value)
168-
when is_integer(Value), Value >= 0 ->
168+
when is_integer(Value) ->
169169
ok;
170170
validate_policy0(<<"delivery-limit">>, Value) ->
171171
{error, "~tp is not a valid delivery limit", [Value]};
@@ -208,14 +208,35 @@ validate_policy0(<<"stream-filter-size-bytes">>, Value)
208208
validate_policy0(<<"stream-filter-size-bytes">>, Value) ->
209209
{error, "~tp is not a valid filter size. Valid range is 16-255", [Value]}.
210210

211-
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
212-
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
213-
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
214-
merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) -> min(Val, OpVal);
215-
merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal);
216-
merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
217-
merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal);
218-
merge_policy_value(<<"queue-version">>, _Val, OpVal) -> OpVal;
219-
merge_policy_value(<<"overflow">>, _Val, OpVal) -> OpVal;
220-
%% use operator policy value for booleans
221-
merge_policy_value(_Key, Val, OpVal) when is_boolean(Val) andalso is_boolean(OpVal) -> OpVal.
211+
merge_policy_value(<<"message-ttl">>, Val, OpVal) ->
212+
min(Val, OpVal);
213+
merge_policy_value(<<"max-length">>, Val, OpVal) ->
214+
min(Val, OpVal);
215+
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) ->
216+
min(Val, OpVal);
217+
merge_policy_value(<<"max-in-memory-length">>, Val, OpVal) ->
218+
min(Val, OpVal);
219+
merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) ->
220+
min(Val, OpVal);
221+
merge_policy_value(<<"expires">>, Val, OpVal) ->
222+
min(Val, OpVal);
223+
merge_policy_value(<<"delivery-limit">>, Val, OpVal) ->
224+
case (is_integer(Val) andalso Val < 0) orelse
225+
(is_integer(OpVal) andalso OpVal < 0) of
226+
true ->
227+
%% one of the policies define an unlimited delivery-limit (negative value)
228+
%% choose the more conservative value
229+
max(Val, OpVal);
230+
false ->
231+
%% else choose the lower value
232+
min(Val, OpVal)
233+
end;
234+
merge_policy_value(<<"queue-version">>, _Val, OpVal) ->
235+
OpVal;
236+
merge_policy_value(<<"overflow">>, _Val, OpVal) ->
237+
OpVal;
238+
merge_policy_value(_Key, Val, OpVal)
239+
when is_boolean(Val) andalso
240+
is_boolean(OpVal) ->
241+
%% use operator policy value for booleans
242+
OpVal.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@
8282
file_handle_release_reservation/0]).
8383

8484
-ifdef(TEST).
85-
-export([filter_promotable/2]).
85+
-export([filter_promotable/2,
86+
ra_machine_config/1]).
8687
-endif.
8788

8889
-import(rabbit_queue_type_util, [args_policy_lookup/3,
@@ -322,7 +323,8 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
322323
OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q),
323324
Overflow = overflow(OverflowBin, drop_head, QName),
324325
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
325-
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, fun min/2, Q) of
326+
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>,
327+
fun resolve_delivery_limit/2, Q) of
326328
undefined ->
327329
rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b",
328330
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
@@ -346,6 +348,12 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
346348
msg_ttl => MsgTTL
347349
}.
348350

351+
resolve_delivery_limit(PolVal, ArgVal)
352+
when PolVal < 0 orelse ArgVal < 0 ->
353+
max(PolVal, ArgVal);
354+
resolve_delivery_limit(PolVal, ArgVal) ->
355+
min(PolVal, ArgVal).
356+
349357
policy_has_precedence(Policy, _QueueArg) ->
350358
Policy.
351359

@@ -1898,8 +1906,6 @@ make_mutable_config(Q) ->
18981906
#{tick_timeout => TickTimeout,
18991907
ra_event_formatter => Formatter}.
19001908

1901-
1902-
19031909
get_nodes(Q) when ?is_amqqueue(Q) ->
19041910
#{nodes := Nodes} = amqqueue:get_type_state(Q),
19051911
Nodes.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ all_tests() ->
152152
queue_length_limit_drop_head,
153153
queue_length_limit_reject_publish,
154154
subscribe_redelivery_limit,
155+
subscribe_redelivery_limit_disable,
155156
subscribe_redelivery_limit_many,
156157
subscribe_redelivery_policy,
157158
subscribe_redelivery_limit_with_dead_letter,
@@ -2495,8 +2496,8 @@ subscribe_redelivery_count(Config) ->
24952496
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
24962497
?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
24972498
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2498-
multiple = false,
2499-
requeue = true})
2499+
multiple = false,
2500+
requeue = true})
25002501
after 5000 ->
25012502
exit(basic_deliver_timeout)
25022503
end,
@@ -2508,8 +2509,8 @@ subscribe_redelivery_count(Config) ->
25082509
ct:pal("H1 ~p", [H1]),
25092510
?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
25102511
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
2511-
multiple = false,
2512-
requeue = true})
2512+
multiple = false,
2513+
requeue = true})
25132514
after 5000 ->
25142515
flush(1),
25152516
exit(basic_deliver_timeout_2)
@@ -2521,7 +2522,7 @@ subscribe_redelivery_count(Config) ->
25212522
#amqp_msg{props = #'P_basic'{headers = H2}}} ->
25222523
?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)),
25232524
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
2524-
multiple = false}),
2525+
multiple = false}),
25252526
ct:pal("wait_for_messages_ready", []),
25262527
wait_for_messages_ready(Servers, RaName, 0),
25272528
ct:pal("wait_for_messages_pending_ack", []),
@@ -2551,8 +2552,8 @@ subscribe_redelivery_limit(Config) ->
25512552
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
25522553
?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
25532554
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2554-
multiple = false,
2555-
requeue = true})
2555+
multiple = false,
2556+
requeue = true})
25562557
end,
25572558

25582559
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
@@ -2562,8 +2563,8 @@ subscribe_redelivery_limit(Config) ->
25622563
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
25632564
?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
25642565
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
2565-
multiple = false,
2566-
requeue = true})
2566+
multiple = false,
2567+
requeue = true})
25672568
end,
25682569

25692570
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
@@ -2574,6 +2575,51 @@ subscribe_redelivery_limit(Config) ->
25742575
ok
25752576
end.
25762577

2578+
subscribe_redelivery_limit_disable(Config) ->
2579+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2580+
2581+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2582+
QQ = ?config(queue_name, Config),
2583+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
2584+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
2585+
{<<"x-delivery-limit">>, long, -1}])),
2586+
publish(Ch, QQ),
2587+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
2588+
subscribe(Ch, QQ, false),
2589+
2590+
DCHeader = <<"x-delivery-count">>,
2591+
receive
2592+
{#'basic.deliver'{delivery_tag = DeliveryTag,
2593+
redelivered = false},
2594+
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
2595+
?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
2596+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2597+
multiple = false,
2598+
requeue = true})
2599+
end,
2600+
2601+
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
2602+
%% set an operator policy, this should always win
2603+
ok = rabbit_ct_broker_helpers:set_operator_policy(
2604+
Config, 0, <<"delivery-limit">>, QQ, <<"queues">>,
2605+
[{<<"delivery-limit">>, 0}]),
2606+
2607+
receive
2608+
{#'basic.deliver'{delivery_tag = DeliveryTag2,
2609+
redelivered = true},
2610+
#amqp_msg{props = #'P_basic'{}}} ->
2611+
% ?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
2612+
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
2613+
multiple = false,
2614+
requeue = true})
2615+
after 5000 ->
2616+
flush(1),
2617+
ct:fail("message did not arrive as expected")
2618+
end,
2619+
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
2620+
ok = rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"delivery-limit">>),
2621+
ok.
2622+
25772623
%% Test that consumer credit is increased correctly.
25782624
subscribe_redelivery_limit_many(Config) ->
25792625
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -2637,8 +2683,8 @@ subscribe_redelivery_policy(Config) ->
26372683
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
26382684
?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
26392685
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2640-
multiple = false,
2641-
requeue = true})
2686+
multiple = false,
2687+
requeue = true})
26422688
end,
26432689

26442690
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
@@ -2648,8 +2694,8 @@ subscribe_redelivery_policy(Config) ->
26482694
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
26492695
?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
26502696
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
2651-
multiple = false,
2652-
requeue = true})
2697+
multiple = false,
2698+
requeue = true})
26532699
end,
26542700

26552701
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
@@ -2687,8 +2733,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
26872733
#amqp_msg{props = #'P_basic'{headers = H0}}} ->
26882734
?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
26892735
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2690-
multiple = false,
2691-
requeue = true})
2736+
multiple = false,
2737+
requeue = true})
26922738
end,
26932739

26942740
wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
@@ -2698,8 +2744,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
26982744
#amqp_msg{props = #'P_basic'{headers = H1}}} ->
26992745
?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
27002746
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
2701-
multiple = false,
2702-
requeue = true})
2747+
multiple = false,
2748+
requeue = true})
27032749
end,
27042750

27052751
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
@@ -2726,8 +2772,8 @@ consume_redelivery_count(Config) ->
27262772
no_ack = false}),
27272773
?assertMatch(undefined, rabbit_basic:header(DCHeader, H0)),
27282774
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2729-
multiple = false,
2730-
requeue = true}),
2775+
multiple = false,
2776+
requeue = true}),
27312777
%% wait for requeuing
27322778
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
27332779
redelivered = true},
@@ -2736,8 +2782,8 @@ consume_redelivery_count(Config) ->
27362782

27372783
?assertMatch({DCHeader, _, 1}, rabbit_basic:header(DCHeader, H1)),
27382784
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
2739-
multiple = false,
2740-
requeue = true}),
2785+
multiple = false,
2786+
requeue = true}),
27412787

27422788
{#'basic.get_ok'{delivery_tag = DeliveryTag2,
27432789
redelivered = true},
@@ -2746,8 +2792,8 @@ consume_redelivery_count(Config) ->
27462792
no_ack = false}),
27472793
?assertMatch({DCHeader, _, 2}, rabbit_basic:header(DCHeader, H2)),
27482794
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
2749-
multiple = false,
2750-
requeue = true}),
2795+
multiple = false,
2796+
requeue = true}),
27512797
ok.
27522798

27532799
message_bytes_metrics(Config) ->
@@ -2784,8 +2830,8 @@ message_bytes_metrics(Config) ->
27842830
{#'basic.deliver'{delivery_tag = DeliveryTag,
27852831
redelivered = false}, _} ->
27862832
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
2787-
multiple = false,
2788-
requeue = false}),
2833+
multiple = false,
2834+
requeue = false}),
27892835
wait_for_messages_ready(Servers, RaName, 0),
27902836
wait_for_messages_pending_ack(Servers, RaName, 0),
27912837
rabbit_ct_helpers:await_condition(

deps/rabbit/test/rabbit_fifo_SUITE.erl

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2137,7 +2137,6 @@ reject_publish_applied_after_limit_test(Config) ->
21372137
queue_resource => QName,
21382138
max_length => 2,
21392139
overflow_strategy => reject_publish,
2140-
max_in_memory_length => 0,
21412140
dead_letter_handler => undefined
21422141
},
21432142
{State5, ok, Efx1} = apply(meta(Config, 5), rabbit_fifo:make_update_config(Conf), State4),
@@ -2148,6 +2147,31 @@ reject_publish_applied_after_limit_test(Config) ->
21482147
apply(meta(Config, 1), make_register_enqueuer(Pid2), State5),
21492148
ok.
21502149

2150+
update_config_delivery_limit_test(Config) ->
2151+
QName = rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
2152+
InitConf = #{name => ?FUNCTION_NAME,
2153+
queue_resource => QName,
2154+
delivery_limit => 20
2155+
},
2156+
State0 = init(InitConf),
2157+
?assertMatch(#{config := #{delivery_limit := 20}},
2158+
rabbit_fifo:overview(State0)),
2159+
2160+
%% A delivery limit of -1 (or any negative value) turns the delivery_limit
2161+
%% off
2162+
Conf = #{name => ?FUNCTION_NAME,
2163+
queue_resource => QName,
2164+
delivery_limit => -1,
2165+
dead_letter_handler => undefined
2166+
},
2167+
{State1, ok, _} = apply(meta(Config, ?LINE),
2168+
rabbit_fifo:make_update_config(Conf), State0),
2169+
2170+
?assertMatch(#{config := #{delivery_limit := undefined}},
2171+
rabbit_fifo:overview(State1)),
2172+
2173+
ok.
2174+
21512175
purge_nodes_test(Config) ->
21522176
Node = purged@node,
21532177
ThisNode = node(),

0 commit comments

Comments
 (0)