Skip to content

Commit bf531fd

Browse files
author
Daniil Fedotov
committed
Add configurable queue overflow strategy
If a queue is to be overflowed by a delivery it can reject the delivery or drop messages from the head. To reject delivery overflow can be configured to `reject_publish`, to drop head it's `drop_head` (default setting). Messages which will be rejected should still confirm being routed, so mandatory expectations are not accumulated on the channel side. Slave nodes will only confirm if a message was published or discarded. To drop confirms from slaves, all confirms for a message are cleared when the message is rejected. When promoting a new master, left-behind deliveries should be rejected if the queue is full, just like normal deliveries. Fixes #995 [#151294447]
1 parent 7e64d48 commit bf531fd

11 files changed

+410
-38
lines changed

src/dtree.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
-module(dtree).
3434

35-
-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
35+
-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
3636
is_defined/2, is_empty/1, smallest/1, size/1]).
3737

3838
%%----------------------------------------------------------------------------
@@ -50,6 +50,7 @@
5050
-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
5151
-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
5252
-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
53+
-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
5354
-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
5455
-spec drop(pk(), ?MODULE()) -> ?MODULE().
5556
-spec is_defined(sk(), ?MODULE()) -> boolean().
@@ -107,6 +108,26 @@ take(SK, {P, S}) ->
107108
{KVs, {P1, gb_trees:delete(SK, S)}}
108109
end.
109110

111+
%% Drop an entry with the primary key and clears secondary keys for this key,
112+
%% returning a list with a key-value pair as a result.
113+
%% If the primary key does not exist, returns an empty list.
114+
take_one(PK, {P, S}) ->
115+
case gb_trees:lookup(PK, P) of
116+
{value, {SKS, Value}} ->
117+
P1 = gb_trees:delete(PK, P),
118+
S1 = gb_sets:fold(
119+
fun(SK, Acc) ->
120+
{value, PKS} = gb_trees:lookup(SK, Acc),
121+
PKS1 = gb_sets:delete(PK, PKS),
122+
case gb_sets:is_empty(PKS1) of
123+
true -> gb_trees:delete(SK, Acc);
124+
false -> gb_trees:update(SK, PKS1, Acc)
125+
end
126+
end, S, SKS),
127+
{[{PK, Value}], {P1, S1}};
128+
none -> {[], {P, S}}
129+
end.
130+
110131
%% Drop all entries which contain the given secondary key, returning
111132
%% the primary-key/value pairs of these entries. It is ok for the
112133
%% given secondary key to not exist.

src/rabbit_amqqueue.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ declare_args() ->
576576
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
577577
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
578578
{<<"x-max-priority">>, fun check_non_neg_int_arg/2},
579+
{<<"x-overflow">>, fun check_overflow/2},
579580
{<<"x-queue-mode">>, fun check_queue_mode/2}].
580581

581582
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
@@ -623,6 +624,14 @@ check_dlxrk_arg({longstr, _}, Args) ->
623624
check_dlxrk_arg({Type, _}, _Args) ->
624625
{error, {unacceptable_type, Type}}.
625626

627+
check_overflow({longstr, Val}, _Args) ->
628+
case lists:member(Val, [<<"drop_head">>, <<"reject_publish">>]) of
629+
true -> ok;
630+
false -> {error, invalid_overflow}
631+
end;
632+
check_overflow({Type, _}, _Args) ->
633+
{error, {unacceptable_type, Type}}.
634+
626635
check_queue_mode({longstr, Val}, _Args) ->
627636
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
628637
true -> ok;

src/rabbit_amqqueue_process.erl

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@
7979
max_length,
8080
%% max length in bytes, if configured
8181
max_bytes,
82+
%% an action to perform if queue is to be over a limit,
83+
%% can be either drop_head (default) or reject_publish
84+
overflow,
8285
%% when policies change, this version helps queue
8386
%% determine what previously scheduled/set up state to ignore,
8487
%% e.g. message expiration messages from previously set up timers
@@ -158,7 +161,8 @@ init_state(Q) ->
158161
senders = pmon:new(delegate),
159162
msg_id_to_channel = gb_trees:empty(),
160163
status = running,
161-
args_policy_version = 0},
164+
args_policy_version = 0,
165+
overflow = drop_head},
162166
rabbit_event:init_stats_timer(State, #q.stats_timer).
163167

164168
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -259,7 +263,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
259263
msg_id_to_channel = MTC},
260264
State2 = process_args_policy(State1),
261265
State3 = lists:foldl(fun (Delivery, StateN) ->
262-
deliver_or_enqueue(Delivery, true, StateN)
266+
maybe_deliver_or_enqueue(Delivery, true, StateN)
263267
end, State2, Deliveries),
264268
notify_decorators(startup, State3),
265269
State3.
@@ -377,6 +381,7 @@ process_args_policy(State = #q{q = Q,
377381
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
378382
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
379383
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
384+
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
380385
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
381386
drop_expired_msgs(
382387
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
@@ -420,6 +425,12 @@ init_max_bytes(MaxBytes, State) ->
420425
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
421426
State1.
422427

428+
init_overflow(undefined, State) ->
429+
State;
430+
init_overflow(Overflow, State) ->
431+
%% TODO maybe drop head
432+
State#q{overflow = binary_to_existing_atom(Overflow, utf8)}.
433+
423434
init_queue_mode(undefined, State) ->
424435
State;
425436
init_queue_mode(Mode, State = #q {backing_queue = BQ,
@@ -620,12 +631,22 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
620631
State#q{consumers = Consumers})}
621632
end.
622633

634+
maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
635+
send_mandatory(Delivery), %% must do this before confirms
636+
case {will_overflow(Delivery, State), Overflow} of
637+
{true, reject_publish} ->
638+
%% Drop publish and nack to publisher
639+
nack_publish_no_space(Delivery, Delivered, State);
640+
_ ->
641+
%% Enqueue and maybe drop head later
642+
deliver_or_enqueue(Delivery, Delivered, State)
643+
end.
644+
623645
deliver_or_enqueue(Delivery = #delivery{message = Message,
624646
sender = SenderPid,
625647
flow = Flow},
626648
Delivered, State = #q{backing_queue = BQ,
627649
backing_queue_state = BQS}) ->
628-
send_mandatory(Delivery), %% must do this before confirms
629650
{Confirm, State1} = send_or_record_confirm(Delivery, State),
630651
Props = message_properties(Message, Confirm, State1),
631652
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -643,6 +664,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
643664
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
644665
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
645666
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
667+
646668
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
647669
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
648670
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
@@ -664,7 +686,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
664686
maybe_drop_head(State = #q{max_length = undefined,
665687
max_bytes = undefined}) ->
666688
{false, State};
667-
maybe_drop_head(State) ->
689+
maybe_drop_head(State = #q{overflow = reject_publish}) ->
690+
{false, State};
691+
maybe_drop_head(State = #q{overflow = drop_head}) ->
668692
maybe_drop_head(false, State).
669693

670694
maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
@@ -683,6 +707,35 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
683707
{AlreadyDropped, State}
684708
end.
685709

710+
nack_publish_no_space(#delivery{confirm = true,
711+
sender = SenderPid,
712+
msg_seq_no = MsgSeqNo} = Delivery,
713+
_Delivered,
714+
State = #q{ backing_queue = BQ,
715+
backing_queue_state = BQS,
716+
msg_id_to_channel = MTC}) ->
717+
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
718+
gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}),
719+
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
720+
nack_publish_no_space(#delivery{confirm = false},
721+
_Delivered, State) ->
722+
State.
723+
724+
will_overflow(_, #q{max_length = undefined,
725+
max_bytes = undefined}) -> false;
726+
will_overflow(#delivery{message = Message},
727+
#q{max_length = MaxLen,
728+
max_bytes = MaxBytes,
729+
backing_queue = BQ,
730+
backing_queue_state = BQS}) ->
731+
ExpectedQueueLength = BQ:len(BQS) + 1,
732+
733+
#basic_message{content = #content{payload_fragments_rev = PFR}} = Message,
734+
MessageSize = iolist_size(PFR),
735+
ExpectedQueueSizeBytes = BQ:info(message_bytes_ready, BQS) + MessageSize,
736+
737+
ExpectedQueueLength > MaxLen orelse ExpectedQueueSizeBytes > MaxBytes.
738+
686739
over_max_length(#q{max_length = MaxLen,
687740
max_bytes = MaxBytes,
688741
backing_queue = BQ,
@@ -1242,8 +1295,10 @@ handle_cast({run_backing_queue, Mod, Fun},
12421295
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
12431296
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
12441297

1245-
handle_cast({deliver, Delivery = #delivery{sender = Sender,
1246-
flow = Flow}, SlaveWhenPublished},
1298+
handle_cast({deliver,
1299+
Delivery = #delivery{sender = Sender,
1300+
flow = Flow},
1301+
SlaveWhenPublished},
12471302
State = #q{senders = Senders}) ->
12481303
Senders1 = case Flow of
12491304
%% In both credit_flow:ack/1 we are acking messages to the channel
@@ -1258,7 +1313,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
12581313
noflow -> Senders
12591314
end,
12601315
State1 = State#q{senders = Senders1},
1261-
noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
1316+
noreply(maybe_deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
12621317
%% [0] The second ack is since the channel thought we were a slave at
12631318
%% the time it published this message, so it used two credits (see
12641319
%% rabbit_amqqueue:deliver/2).

src/rabbit_channel.erl

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@
136136
%% a list of tags for published messages that were
137137
%% delivered but are yet to be confirmed to the client
138138
confirmed,
139+
%% a list of tags for published messages that were
140+
%% rejected but are yet to be sent to the client
141+
rejected,
139142
%% a dtree used to track oustanding notifications
140143
%% for messages published as mandatory
141144
mandatory,
@@ -399,6 +402,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
399402
confirm_enabled = false,
400403
publish_seqno = 1,
401404
unconfirmed = dtree:empty(),
405+
rejected = [],
402406
confirmed = [],
403407
mandatory = dtree:empty(),
404408
capabilities = Capabilities,
@@ -429,6 +433,7 @@ prioritise_call(Msg, _From, _Len, _State) ->
429433
prioritise_cast(Msg, _Len, _State) ->
430434
case Msg of
431435
{confirm, _MsgSeqNos, _QPid} -> 5;
436+
{reject_publish, _MsgSeqNos, _QPid} -> 5;
432437
{mandatory_received, _MsgSeqNo, _QPid} -> 5;
433438
_ -> 0
434439
end.
@@ -578,6 +583,13 @@ handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
578583
%% NB: don't call noreply/1 since we don't want to send confirms.
579584
noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
580585

586+
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
587+
%% It does not matter which queue rejected the message,
588+
%% if any queue rejected it - it should not be confirmed.
589+
{MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
590+
%% NB: don't call noreply/1 since we don't want to send confirms.
591+
noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
592+
581593
handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
582594
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
583595
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -601,7 +613,7 @@ handle_info(emit_stats, State) ->
601613
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
602614
%% NB: don't call noreply/1 since we don't want to kick off the
603615
%% stats timer.
604-
{noreply, send_confirms(State1), hibernate};
616+
{noreply, send_confirms_and_nacks(State1), hibernate};
605617

606618
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
607619
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -661,10 +673,10 @@ reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
661673

662674
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
663675

664-
next_state(State) -> ensure_stats_timer(send_confirms(State)).
676+
next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
665677

666-
noreply_coalesce(State = #ch{confirmed = C}) ->
667-
Timeout = case C of [] -> hibernate; _ -> 0 end,
678+
noreply_coalesce(State = #ch{confirmed = C, rejected = R}) ->
679+
Timeout = case {C, R} of {[], []} -> hibernate; _ -> 0 end,
668680
{noreply, ensure_stats_timer(State), Timeout}.
669681

670682
ensure_stats_timer(State) ->
@@ -798,7 +810,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
798810
RoutingKey,
799811
Permission) ->
800812
Resource = Name#resource{kind = topic},
801-
Timeout = get_operation_timeout(),
813+
Timeout = get_operation_timeout(),
802814
AmqpParams = case ConnPid of
803815
none ->
804816
%% Called from outside the channel by mgmt API
@@ -942,6 +954,15 @@ maybe_set_fast_reply_to(
942954
maybe_set_fast_reply_to(C, _State) ->
943955
C.
944956

957+
record_rejects([], State) ->
958+
State;
959+
record_rejects(MXs, State = #ch{rejected = R, tx = Tx}) ->
960+
Tx1 = case Tx of
961+
none -> none;
962+
_ -> failed
963+
end,
964+
State#ch{rejected = [MXs | R], tx = Tx1}.
965+
945966
record_confirms([], State) ->
946967
State;
947968
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -1846,21 +1867,23 @@ send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
18461867
send_nacks(_, State) ->
18471868
maybe_complete_tx(State#ch{tx = failed}).
18481869

1849-
send_confirms(State = #ch{tx = none, confirmed = []}) ->
1870+
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
18501871
State;
1851-
send_confirms(State = #ch{tx = none, confirmed = C}) ->
1872+
send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
18521873
case rabbit_node_monitor:pause_partition_guard() of
1853-
ok -> MsgSeqNos =
1874+
ok -> ConfirmMsgSeqNos =
18541875
lists:foldl(
18551876
fun ({MsgSeqNo, XName}, MSNs) ->
18561877
?INCR_STATS([{exchange_stats, XName, 1}],
18571878
confirm, State),
18581879
[MsgSeqNo | MSNs]
18591880
end, [], lists:append(C)),
1860-
send_confirms(MsgSeqNos, State#ch{confirmed = []});
1881+
State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
1882+
%% TODO: msg seq nos, same as for confirms.
1883+
send_nacks(lists:append(R), State1#ch{rejected = []});
18611884
pausing -> State
18621885
end;
1863-
send_confirms(State) ->
1886+
send_confirms_and_nacks(State) ->
18641887
case rabbit_node_monitor:pause_partition_guard() of
18651888
ok -> maybe_complete_tx(State);
18661889
pausing -> State

src/rabbit_mirror_queue_slave.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
302302
%% We are acking messages to the channel process that sent us
303303
%% the message delivery. See
304304
%% rabbit_amqqueue_process:handle_ch_down for more info.
305+
%% TODO: reject publishes
305306
maybe_flow_ack(Sender, Flow),
306307
noreply(maybe_enqueue_message(Delivery, State));
307308

src/rabbit_policies.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ register() ->
4242
{policy_validator, <<"max-length">>},
4343
{policy_validator, <<"max-length-bytes">>},
4444
{policy_validator, <<"queue-mode">>},
45+
{policy_validator, <<"overflow">>},
4546
{operator_policy_validator, <<"expires">>},
4647
{operator_policy_validator, <<"message-ttl">>},
4748
{operator_policy_validator, <<"max-length">>},
@@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) ->
104105
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
105106
ok;
106107
validate_policy0(<<"queue-mode">>, Value) ->
107-
{error, "~p is not a valid queue-mode value", [Value]}.
108+
{error, "~p is not a valid queue-mode value", [Value]};
109+
validate_policy0(<<"overflow">>, <<"drop_head">>) ->
110+
ok;
111+
validate_policy0(<<"overflow">>, <<"reject_publish">>) ->
112+
ok;
113+
validate_policy0(<<"overflow">>, Value) ->
114+
{error, "~p is not a valid overflow value", [Value]}.
108115

109116
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
110117
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);

test/clustering_management_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -704,7 +704,7 @@ assert_failure(Fun) ->
704704
{error_string, Reason} -> Reason;
705705
{badrpc, {'EXIT', Reason}} -> Reason;
706706
{badrpc_multi, Reason, _Nodes} -> Reason;
707-
Other -> exit({expected_failure, Other})
707+
Other -> error({expected_failure, Other})
708708
end.
709709

710710
stop_app(Node) ->

0 commit comments

Comments
 (0)