Skip to content

Commit a4b6025

Browse files
Merge pull request #1834 from rabbitmq/qq-queue-length-drop-head
Queue length limit by byte size and number of messages (Drop head only)
2 parents f1bb0e1 + 07bfd2b commit a4b6025

File tree

8 files changed

+766
-364
lines changed

8 files changed

+766
-364
lines changed

src/rabbit_fifo.erl

Lines changed: 331 additions & 234 deletions
Large diffs are not rendered by default.

src/rabbit_fifo_index.erl

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
empty/0,
55
fetch/2,
66
append/3,
7+
update_if_present/3,
78
return/3,
89
delete/2,
910
size/1,
1011
smallest/1,
1112
next_key_after/2,
12-
map/2
13+
map/2,
14+
to_map/1
1315
]).
1416

1517
-include_lib("ra/include/ra.hrl").
@@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) ->
3638
-spec append(integer(), term(), state()) -> state().
3739
append(Key, Value,
3840
#?MODULE{data = Data,
39-
smallest = Smallest,
40-
largest = Largest} = State)
41+
smallest = Smallest,
42+
largest = Largest} = State)
4143
when Key > Largest orelse Largest =:= undefined ->
4244
State#?MODULE{data = maps:put(Key, Value, Data),
43-
smallest = ra_lib:default(Smallest, Key),
44-
largest = Key}.
45+
smallest = ra_lib:default(Smallest, Key),
46+
largest = Key}.
47+
48+
-spec update_if_present(integer(), term(), state()) -> state().
49+
update_if_present(Key, Value, #?MODULE{data = Data} = State) ->
50+
case Data of
51+
#{Key := _} ->
52+
State#?MODULE{data = maps:put(Key, Value, Data)};
53+
_ ->
54+
State
55+
end.
56+
4557

4658
-spec return(integer(), term(), state()) -> state().
4759
return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
@@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) ->
7688
size(#?MODULE{data = Data}) ->
7789
maps:size(Data).
7890

91+
-spec to_map(state()) -> #{integer() => term()}.
92+
to_map(#?MODULE{data = Data}) ->
93+
Data.
94+
7995
-spec smallest(state()) -> undefined | {integer(), term()}.
8096
smallest(#?MODULE{smallest = undefined}) ->
8197
undefined;

src/rabbit_quorum_queue.erl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,15 @@ ra_machine(Q) ->
152152

153153
ra_machine_config(Q = #amqqueue{name = QName,
154154
pid = {Name, _}}) ->
155+
%% take the minimum value of the policy and the queue arg if present
156+
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
157+
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
155158
#{name => Name,
156159
queue_resource => QName,
157160
dead_letter_handler => dlx_mfa(Q),
158161
become_leader_handler => {?MODULE, become_leader, [QName]},
162+
max_length => MaxLength,
163+
max_bytes => MaxBytes,
159164
single_active_consumer_on => single_active_consumer_on(Q)}.
160165

161166
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
@@ -636,8 +641,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
636641

637642
%%----------------------------------------------------------------------------
638643
dlx_mfa(Q) ->
639-
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
640-
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
644+
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
645+
fun res_arg/2, Q), Q),
646+
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
647+
fun res_arg/2, Q),
641648
{?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
642649

643650
init_dlx(undefined, _Q) ->
@@ -829,9 +836,8 @@ qnode({_, Node}) ->
829836
Node.
830837

831838
check_invalid_arguments(QueueName, Args) ->
832-
Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>,
833-
<<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>,
834-
<<"x-queue-mode">>],
839+
Keys = [<<"x-expires">>, <<"x-message-ttl">>,
840+
<<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
835841
[case rabbit_misc:table_lookup(Args, Key) of
836842
undefined -> ok;
837843
_TypeVal -> rabbit_misc:protocol_error(

test/quorum_queue_SUITE.erl

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
-import(quorum_queue_utils, [wait_for_messages_ready/3,
2424
wait_for_messages_pending_ack/3,
25+
wait_for_messages_total/3,
2526
dirty_query/3,
2627
ra_name/1]).
2728

@@ -37,6 +38,7 @@ all() ->
3738
groups() ->
3839
[
3940
{single_node, [], all_tests()},
41+
{single_node, [], memory_tests()},
4042
{unclustered, [], [
4143
{cluster_size_2, [], [add_member]}
4244
]},
@@ -51,6 +53,7 @@ groups() ->
5153
delete_member_not_found,
5254
delete_member]
5355
++ all_tests()},
56+
{cluster_size_2, [], memory_tests()},
5457
{cluster_size_3, [], [
5558
declare_during_node_down,
5659
simple_confirm_availability_on_leader_change,
@@ -61,7 +64,8 @@ groups() ->
6164
delete_declare,
6265
metrics_cleanup_on_leadership_takeover,
6366
metrics_cleanup_on_leader_crash,
64-
consume_in_minority]},
67+
consume_in_minority
68+
]},
6569
{cluster_size_5, [], [start_queue,
6670
start_queue_concurrent,
6771
quorum_cluster_size_3,
@@ -126,6 +130,11 @@ all_tests() ->
126130
consume_redelivery_count,
127131
subscribe_redelivery_count,
128132
message_bytes_metrics,
133+
queue_length_limit_drop_head
134+
].
135+
136+
memory_tests() ->
137+
[
129138
memory_alarm_rolls_wal
130139
].
131140

@@ -240,7 +249,9 @@ declare_args(Config) ->
240249

241250
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
242251
LQ = ?config(queue_name, Config),
243-
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
252+
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
253+
{<<"x-max-length">>, long, 2000},
254+
{<<"x-max-length-bytes">>, long, 2000}]),
244255
assert_queue_type(Server, LQ, quorum),
245256

246257
DQ = <<"classic-declare-args-q">>,
@@ -293,16 +304,6 @@ declare_invalid_args(Config) ->
293304
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
294305
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
295306
{<<"x-message-ttl">>, long, 2000}])),
296-
?assertExit(
297-
{{shutdown, {server_initiated_close, 406, _}}, _},
298-
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
299-
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
300-
{<<"x-max-length">>, long, 2000}])),
301-
?assertExit(
302-
{{shutdown, {server_initiated_close, 406, _}}, _},
303-
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
304-
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
305-
{<<"x-max-length-bytes">>, long, 2000}])),
306307

307308
?assertExit(
308309
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -314,7 +315,7 @@ declare_invalid_args(Config) ->
314315
{{shutdown, {server_initiated_close, 406, _}}, _},
315316
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
316317
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
317-
{<<"x-overflow">>, longstr, <<"drop-head">>}])),
318+
{<<"x-overflow">>, longstr, <<"reject-publish">>}])),
318319

319320
?assertExit(
320321
{{shutdown, {server_initiated_close, 406, _}}, _},
@@ -1422,7 +1423,7 @@ metrics_cleanup_on_leadership_takeover(Config) ->
14221423
_ -> false
14231424
end
14241425
end),
1425-
force_leader_change(Leader, Servers, QQ),
1426+
force_leader_change(Servers, QQ),
14261427
wait_until(fun () ->
14271428
[] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
14281429
[] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
@@ -2151,6 +2152,32 @@ memory_alarm_rolls_wal(Config) ->
21512152
timer:sleep(1000),
21522153
ok.
21532154

2155+
queue_length_limit_drop_head(Config) ->
2156+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2157+
2158+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2159+
QQ = ?config(queue_name, Config),
2160+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
2161+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
2162+
{<<"x-max-length">>, long, 1}])),
2163+
2164+
RaName = ra_name(QQ),
2165+
ok = amqp_channel:cast(Ch,
2166+
#'basic.publish'{routing_key = QQ},
2167+
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
2168+
payload = <<"msg1">>}),
2169+
ok = amqp_channel:cast(Ch,
2170+
#'basic.publish'{routing_key = QQ},
2171+
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
2172+
payload = <<"msg2">>}),
2173+
wait_for_consensus(QQ, Config),
2174+
wait_for_messages_ready(Servers, RaName, 1),
2175+
wait_for_messages_pending_ack(Servers, RaName, 0),
2176+
wait_for_messages_total(Servers, RaName, 1),
2177+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
2178+
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
2179+
no_ack = true})).
2180+
21542181
%%----------------------------------------------------------------------------
21552182

21562183
declare(Ch, Q) ->
@@ -2201,6 +2228,9 @@ filter_queues(Expected, Got) ->
22012228
lists:member(K, Keys)
22022229
end, Got).
22032230

2231+
publish_many(Ch, Queue, Count) ->
2232+
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].
2233+
22042234
publish(Ch, Queue) ->
22052235
ok = amqp_channel:cast(Ch,
22062236
#'basic.publish'{routing_key = Queue},
@@ -2268,14 +2298,16 @@ wait_until(Condition, N) ->
22682298
wait_until(Condition, N - 1)
22692299
end.
22702300

2271-
force_leader_change(Leader, Servers, Q) ->
2301+
2302+
force_leader_change([Server | _] = Servers, Q) ->
22722303
RaName = ra_name(Q),
2304+
{ok, _, {_, Leader}} = ra:members({RaName, Server}),
22732305
[F1, _] = Servers -- [Leader],
22742306
ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
22752307
case ra:members({RaName, Leader}) of
22762308
{ok, _, {_, Leader}} ->
22772309
%% Leader has been re-elected
2278-
force_leader_change(Leader, Servers, Q);
2310+
force_leader_change(Servers, Q);
22792311
{ok, _, _} ->
22802312
%% Leader has changed
22812313
ok
@@ -2297,3 +2329,8 @@ get_message_bytes(Leader, QRes) ->
22972329
_ ->
22982330
[]
22992331
end.
2332+
2333+
wait_for_consensus(Name, Config) ->
2334+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
2335+
RaName = ra_name(Name),
2336+
{ok, _, _} = ra:members({RaName, Server}).

test/quorum_queue_utils.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
-export([
66
wait_for_messages_ready/3,
77
wait_for_messages_pending_ack/3,
8+
wait_for_messages_total/3,
89
dirty_query/3,
910
ra_name/1
1011
]).
@@ -17,6 +18,10 @@ wait_for_messages_pending_ack(Servers, QName, Ready) ->
1718
wait_for_messages(Servers, QName, Ready,
1819
fun rabbit_fifo:query_messages_checked_out/1, 60).
1920

21+
wait_for_messages_total(Servers, QName, Total) ->
22+
wait_for_messages(Servers, QName, Total,
23+
fun rabbit_fifo:query_messages_total/1, 60).
24+
2025
wait_for_messages(Servers, QName, Number, Fun, 0) ->
2126
Msgs = dirty_query(Servers, QName, Fun),
2227
Totals = lists:map(fun(M) when is_map(M) ->
@@ -28,8 +33,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
2833
wait_for_messages(Servers, QName, Number, Fun, N) ->
2934
Msgs = dirty_query(Servers, QName, Fun),
3035
ct:pal("Got messages ~p", [Msgs]),
31-
case lists:all(fun(M) when is_map(M) ->
32-
maps:size(M) == Number;
36+
case lists:all(fun(C) when is_integer(C) ->
37+
C == Number;
3338
(_) ->
3439
false
3540
end, Msgs) of

test/rabbit_fifo_SUITE.erl

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ basics(Config) ->
124124
{ra_event, Frm, E} ->
125125
case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
126126
{internal, _, _, _FState7} ->
127-
ct:pal("unexpected event ~p~n", [E]),
128127
exit({unexpected_internal_event, E});
129128
{{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
130129
{ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
@@ -218,9 +217,7 @@ usage(Config) ->
218217
% force tick and usage stats emission
219218
ServerId ! tick_timeout,
220219
timer:sleep(50),
221-
% ct:pal("ets ~w ~w ~w", [ets:tab2list(rabbit_fifo_usage), ServerId, UId]),
222220
Use = rabbit_fifo:usage(element(1, ServerId)),
223-
ct:pal("Use ~w~n", [Use]),
224221
ra:stop_server(ServerId),
225222
?assert(Use > 0.0),
226223
ok.
@@ -300,6 +297,7 @@ returns_after_down(Config) ->
300297
Self ! checkout_done
301298
end),
302299
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
300+
timer:sleep(1000),
303301
% message should be available for dequeue
304302
{ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
305303
ra:stop_server(ServerId),
@@ -380,7 +378,6 @@ discard(Config) ->
380378
{ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
381379
receive
382380
{dead_letter, Letters} ->
383-
ct:pal("dead letters ~p~n", [Letters]),
384381
[{_, msg1}] = Letters,
385382
ok
386383
after 500 ->
@@ -481,16 +478,13 @@ test_queries(Config) ->
481478
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
482479
{ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
483480
{ok, {_, Ready}, _} = ra:local_query(ServerId,
484-
fun rabbit_fifo:query_messages_ready/1),
485-
?assertEqual(1, maps:size(Ready)),
486-
ct:pal("Ready ~w~n", [Ready]),
481+
fun rabbit_fifo:query_messages_ready/1),
482+
?assertEqual(1, Ready),
487483
{ok, {_, Checked}, _} = ra:local_query(ServerId,
488-
fun rabbit_fifo:query_messages_checked_out/1),
489-
?assertEqual(1, maps:size(Checked)),
490-
ct:pal("Checked ~w~n", [Checked]),
484+
fun rabbit_fifo:query_messages_checked_out/1),
485+
?assertEqual(1, Checked),
491486
{ok, {_, Processes}, _} = ra:local_query(ServerId,
492-
fun rabbit_fifo:query_processes/1),
493-
ct:pal("Processes ~w~n", [Processes]),
487+
fun rabbit_fifo:query_processes/1),
494488
?assertEqual(2, length(Processes)),
495489
P ! stop,
496490
ra:stop_server(ServerId),
@@ -565,7 +559,6 @@ process_ra_events(State, Acc, Wait) ->
565559
process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
566560
receive
567561
{ra_event, From, Evt} ->
568-
% ct:pal("ra event ~w~n", [Evt]),
569562
case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
570563
{internal, _, Actions, State} ->
571564
process_ra_events0(State, Acc, Actions0 ++ Actions,
@@ -588,7 +581,6 @@ discard_next_delivery(State0, Wait) ->
588581
discard_next_delivery(State, Wait);
589582
{{delivery, Tag, Msgs}, State1} ->
590583
MsgIds = [element(1, M) || M <- Msgs],
591-
ct:pal("discarding ~p", [Msgs]),
592584
{ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
593585
State1),
594586
State
@@ -605,7 +597,6 @@ return_next_delivery(State0, Wait) ->
605597
return_next_delivery(State, Wait);
606598
{{delivery, Tag, Msgs}, State1} ->
607599
MsgIds = [element(1, M) || M <- Msgs],
608-
ct:pal("returning ~p", [Msgs]),
609600
{ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
610601
State1),
611602
State

0 commit comments

Comments
 (0)