Skip to content

Commit 0d576e2

Browse files
Merge pull request #1374 from rabbitmq/rabbitmq-server-995
An option to discard messages if queue is full.
2 parents df25c13 + 635c518 commit 0d576e2

11 files changed

+427
-38
lines changed

src/dtree.erl

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
-module(dtree).
3434

35-
-export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
35+
-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
3636
is_defined/2, is_empty/1, smallest/1, size/1]).
3737

3838
%%----------------------------------------------------------------------------
@@ -50,6 +50,7 @@
5050
-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
5151
-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
5252
-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
53+
-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
5354
-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
5455
-spec drop(pk(), ?MODULE()) -> ?MODULE().
5556
-spec is_defined(sk(), ?MODULE()) -> boolean().
@@ -107,6 +108,26 @@ take(SK, {P, S}) ->
107108
{KVs, {P1, gb_trees:delete(SK, S)}}
108109
end.
109110

111+
%% Drop an entry with the primary key and clears secondary keys for this key,
112+
%% returning a list with a key-value pair as a result.
113+
%% If the primary key does not exist, returns an empty list.
114+
take_one(PK, {P, S}) ->
115+
case gb_trees:lookup(PK, P) of
116+
{value, {SKS, Value}} ->
117+
P1 = gb_trees:delete(PK, P),
118+
S1 = gb_sets:fold(
119+
fun(SK, Acc) ->
120+
{value, PKS} = gb_trees:lookup(SK, Acc),
121+
PKS1 = gb_sets:delete(PK, PKS),
122+
case gb_sets:is_empty(PKS1) of
123+
true -> gb_trees:delete(SK, Acc);
124+
false -> gb_trees:update(SK, PKS1, Acc)
125+
end
126+
end, S, SKS),
127+
{[{PK, Value}], {P1, S1}};
128+
none -> {[], {P, S}}
129+
end.
130+
110131
%% Drop all entries which contain the given secondary key, returning
111132
%% the primary-key/value pairs of these entries. It is ok for the
112133
%% given secondary key to not exist.

src/rabbit_amqqueue.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,7 @@ declare_args() ->
576576
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
577577
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
578578
{<<"x-max-priority">>, fun check_non_neg_int_arg/2},
579+
{<<"x-overflow">>, fun check_overflow/2},
579580
{<<"x-queue-mode">>, fun check_queue_mode/2}].
580581

581582
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
@@ -623,6 +624,14 @@ check_dlxrk_arg({longstr, _}, Args) ->
623624
check_dlxrk_arg({Type, _}, _Args) ->
624625
{error, {unacceptable_type, Type}}.
625626

627+
check_overflow({longstr, Val}, _Args) ->
628+
case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of
629+
true -> ok;
630+
false -> {error, invalid_overflow}
631+
end;
632+
check_overflow({Type, _}, _Args) ->
633+
{error, {unacceptable_type, Type}}.
634+
626635
check_queue_mode({longstr, Val}, _Args) ->
627636
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
628637
true -> ok;

src/rabbit_amqqueue_process.erl

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@
8080
max_length,
8181
%% max length in bytes, if configured
8282
max_bytes,
83+
%% an action to perform if queue is to be over a limit,
84+
%% can be either drop-head (default) or reject-publish
85+
overflow,
8386
%% when policies change, this version helps queue
8487
%% determine what previously scheduled/set up state to ignore,
8588
%% e.g. message expiration messages from previously set up timers
@@ -159,7 +162,8 @@ init_state(Q) ->
159162
senders = pmon:new(delegate),
160163
msg_id_to_channel = gb_trees:empty(),
161164
status = running,
162-
args_policy_version = 0},
165+
args_policy_version = 0,
166+
overflow = 'drop-head'},
163167
rabbit_event:init_stats_timer(State, #q.stats_timer).
164168

165169
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -260,7 +264,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
260264
msg_id_to_channel = MTC},
261265
State2 = process_args_policy(State1),
262266
State3 = lists:foldl(fun (Delivery, StateN) ->
263-
deliver_or_enqueue(Delivery, true, StateN)
267+
maybe_deliver_or_enqueue(Delivery, true, StateN)
264268
end, State2, Deliveries),
265269
notify_decorators(startup, State3),
266270
State3.
@@ -378,6 +382,7 @@ process_args_policy(State = #q{q = Q,
378382
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
379383
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
380384
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
385+
{<<"overflow">>, fun res_arg/2, fun init_overflow/2},
381386
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
382387
drop_expired_msgs(
383388
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
@@ -421,6 +426,18 @@ init_max_bytes(MaxBytes, State) ->
421426
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
422427
State1.
423428

429+
init_overflow(undefined, State) ->
430+
State;
431+
init_overflow(Overflow, State) ->
432+
OverflowVal = binary_to_existing_atom(Overflow, utf8),
433+
case OverflowVal of
434+
'drop-head' ->
435+
{_Dropped, State1} = maybe_drop_head(State#q{overflow = OverflowVal}),
436+
State1;
437+
_ ->
438+
State#q{overflow = OverflowVal}
439+
end.
440+
424441
init_queue_mode(undefined, State) ->
425442
State;
426443
init_queue_mode(Mode, State = #q {backing_queue = BQ,
@@ -621,12 +638,22 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
621638
State#q{consumers = Consumers})}
622639
end.
623640

641+
maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
642+
send_mandatory(Delivery), %% must do this before confirms
643+
case {will_overflow(Delivery, State), Overflow} of
644+
{true, 'reject-publish'} ->
645+
%% Drop publish and nack to publisher
646+
send_reject_publish(Delivery, Delivered, State);
647+
_ ->
648+
%% Enqueue and maybe drop head later
649+
deliver_or_enqueue(Delivery, Delivered, State)
650+
end.
651+
624652
deliver_or_enqueue(Delivery = #delivery{message = Message,
625653
sender = SenderPid,
626654
flow = Flow},
627655
Delivered, State = #q{backing_queue = BQ,
628656
backing_queue_state = BQS}) ->
629-
send_mandatory(Delivery), %% must do this before confirms
630657
{Confirm, State1} = send_or_record_confirm(Delivery, State),
631658
Props = message_properties(Message, Confirm, State1),
632659
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -644,6 +671,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
644671
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
645672
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
646673
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
674+
647675
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
648676
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
649677
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
@@ -665,7 +693,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
665693
maybe_drop_head(State = #q{max_length = undefined,
666694
max_bytes = undefined}) ->
667695
{false, State};
668-
maybe_drop_head(State) ->
696+
maybe_drop_head(State = #q{overflow = 'reject-publish'}) ->
697+
{false, State};
698+
maybe_drop_head(State = #q{overflow = 'drop-head'}) ->
669699
maybe_drop_head(false, State).
670700

671701
maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
@@ -684,6 +714,35 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
684714
{AlreadyDropped, State}
685715
end.
686716

717+
send_reject_publish(#delivery{confirm = true,
718+
sender = SenderPid,
719+
msg_seq_no = MsgSeqNo} = Delivery,
720+
_Delivered,
721+
State = #q{ backing_queue = BQ,
722+
backing_queue_state = BQS,
723+
msg_id_to_channel = MTC}) ->
724+
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
725+
gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}),
726+
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
727+
send_reject_publish(#delivery{confirm = false},
728+
_Delivered, State) ->
729+
State.
730+
731+
will_overflow(_, #q{max_length = undefined,
732+
max_bytes = undefined}) -> false;
733+
will_overflow(#delivery{message = Message},
734+
#q{max_length = MaxLen,
735+
max_bytes = MaxBytes,
736+
backing_queue = BQ,
737+
backing_queue_state = BQS}) ->
738+
ExpectedQueueLength = BQ:len(BQS) + 1,
739+
740+
#basic_message{content = #content{payload_fragments_rev = PFR}} = Message,
741+
MessageSize = iolist_size(PFR),
742+
ExpectedQueueSizeBytes = BQ:info(message_bytes_ready, BQS) + MessageSize,
743+
744+
ExpectedQueueLength > MaxLen orelse ExpectedQueueSizeBytes > MaxBytes.
745+
687746
over_max_length(#q{max_length = MaxLen,
688747
max_bytes = MaxBytes,
689748
backing_queue = BQ,
@@ -1254,8 +1313,10 @@ handle_cast({run_backing_queue, Mod, Fun},
12541313
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
12551314
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
12561315

1257-
handle_cast({deliver, Delivery = #delivery{sender = Sender,
1258-
flow = Flow}, SlaveWhenPublished},
1316+
handle_cast({deliver,
1317+
Delivery = #delivery{sender = Sender,
1318+
flow = Flow},
1319+
SlaveWhenPublished},
12591320
State = #q{senders = Senders}) ->
12601321
Senders1 = case Flow of
12611322
%% In both credit_flow:ack/1 we are acking messages to the channel
@@ -1270,7 +1331,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
12701331
noflow -> Senders
12711332
end,
12721333
State1 = State#q{senders = Senders1},
1273-
noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
1334+
noreply(maybe_deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
12741335
%% [0] The second ack is since the channel thought we were a slave at
12751336
%% the time it published this message, so it used two credits (see
12761337
%% rabbit_amqqueue:deliver/2).

src/rabbit_channel.erl

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@
136136
%% a list of tags for published messages that were
137137
%% delivered but are yet to be confirmed to the client
138138
confirmed,
139+
%% a list of tags for published messages that were
140+
%% rejected but are yet to be sent to the client
141+
rejected,
139142
%% a dtree used to track oustanding notifications
140143
%% for messages published as mandatory
141144
mandatory,
@@ -419,6 +422,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
419422
confirm_enabled = false,
420423
publish_seqno = 1,
421424
unconfirmed = dtree:empty(),
425+
rejected = [],
422426
confirmed = [],
423427
mandatory = dtree:empty(),
424428
capabilities = Capabilities,
@@ -449,6 +453,7 @@ prioritise_call(Msg, _From, _Len, _State) ->
449453
prioritise_cast(Msg, _Len, _State) ->
450454
case Msg of
451455
{confirm, _MsgSeqNos, _QPid} -> 5;
456+
{reject_publish, _MsgSeqNos, _QPid} -> 5;
452457
{mandatory_received, _MsgSeqNo, _QPid} -> 5;
453458
_ -> 0
454459
end.
@@ -598,6 +603,13 @@ handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
598603
%% NB: don't call noreply/1 since we don't want to send confirms.
599604
noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
600605

606+
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
607+
%% It does not matter which queue rejected the message,
608+
%% if any queue rejected it - it should not be confirmed.
609+
{MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
610+
%% NB: don't call noreply/1 since we don't want to send confirms.
611+
noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
612+
601613
handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
602614
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
603615
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -621,7 +633,7 @@ handle_info(emit_stats, State) ->
621633
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
622634
%% NB: don't call noreply/1 since we don't want to kick off the
623635
%% stats timer.
624-
{noreply, send_confirms(State1), hibernate};
636+
{noreply, send_confirms_and_nacks(State1), hibernate};
625637

626638
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
627639
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -681,10 +693,10 @@ reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
681693

682694
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
683695

684-
next_state(State) -> ensure_stats_timer(send_confirms(State)).
696+
next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
685697

686-
noreply_coalesce(State = #ch{confirmed = C}) ->
687-
Timeout = case C of [] -> hibernate; _ -> 0 end,
698+
noreply_coalesce(State = #ch{confirmed = C, rejected = R}) ->
699+
Timeout = case {C, R} of {[], []} -> hibernate; _ -> 0 end,
688700
{noreply, ensure_stats_timer(State), Timeout}.
689701

690702
ensure_stats_timer(State) ->
@@ -818,7 +830,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
818830
RoutingKey,
819831
Permission) ->
820832
Resource = Name#resource{kind = topic},
821-
Timeout = get_operation_timeout(),
833+
Timeout = get_operation_timeout(),
822834
AmqpParams = case ConnPid of
823835
none ->
824836
%% Called from outside the channel by mgmt API
@@ -962,6 +974,15 @@ maybe_set_fast_reply_to(
962974
maybe_set_fast_reply_to(C, _State) ->
963975
C.
964976

977+
record_rejects([], State) ->
978+
State;
979+
record_rejects(MXs, State = #ch{rejected = R, tx = Tx}) ->
980+
Tx1 = case Tx of
981+
none -> none;
982+
_ -> failed
983+
end,
984+
State#ch{rejected = [MXs | R], tx = Tx1}.
985+
965986
record_confirms([], State) ->
966987
State;
967988
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -1874,21 +1895,24 @@ send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
18741895
send_nacks(_, State) ->
18751896
maybe_complete_tx(State#ch{tx = failed}).
18761897

1877-
send_confirms(State = #ch{tx = none, confirmed = []}) ->
1898+
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
18781899
State;
1879-
send_confirms(State = #ch{tx = none, confirmed = C}) ->
1900+
send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
18801901
case rabbit_node_monitor:pause_partition_guard() of
1881-
ok -> MsgSeqNos =
1902+
ok -> ConfirmMsgSeqNos =
18821903
lists:foldl(
18831904
fun ({MsgSeqNo, XName}, MSNs) ->
18841905
?INCR_STATS(exchange_stats, XName, 1,
18851906
confirm, State),
18861907
[MsgSeqNo | MSNs]
18871908
end, [], lists:append(C)),
1888-
send_confirms(MsgSeqNos, State#ch{confirmed = []});
1909+
State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
1910+
%% TODO: msg seq nos, same as for confirms. Need to implement
1911+
%% nack rates first.
1912+
send_nacks(lists:append(R), State1#ch{rejected = []});
18891913
pausing -> State
18901914
end;
1891-
send_confirms(State) ->
1915+
send_confirms_and_nacks(State) ->
18921916
case rabbit_node_monitor:pause_partition_guard() of
18931917
ok -> maybe_complete_tx(State);
18941918
pausing -> State

src/rabbit_mirror_queue_slave.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
302302
%% We are acking messages to the channel process that sent us
303303
%% the message delivery. See
304304
%% rabbit_amqqueue_process:handle_ch_down for more info.
305+
%% If message is rejected by the master, the publish will be nacked
306+
%% even if slaves confirm it. No need to check for length here.
305307
maybe_flow_ack(Sender, Flow),
306308
noreply(maybe_enqueue_message(Delivery, State));
307309

src/rabbit_policies.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ register() ->
4242
{policy_validator, <<"max-length">>},
4343
{policy_validator, <<"max-length-bytes">>},
4444
{policy_validator, <<"queue-mode">>},
45+
{policy_validator, <<"overflow">>},
4546
{operator_policy_validator, <<"expires">>},
4647
{operator_policy_validator, <<"message-ttl">>},
4748
{operator_policy_validator, <<"max-length">>},
@@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) ->
104105
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
105106
ok;
106107
validate_policy0(<<"queue-mode">>, Value) ->
107-
{error, "~p is not a valid queue-mode value", [Value]}.
108+
{error, "~p is not a valid queue-mode value", [Value]};
109+
validate_policy0(<<"overflow">>, <<"drop-head">>) ->
110+
ok;
111+
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
112+
ok;
113+
validate_policy0(<<"overflow">>, Value) ->
114+
{error, "~p is not a valid overflow value", [Value]}.
108115

109116
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
110117
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);

test/clustering_management_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ assert_failure(Fun) ->
708708
%% Failure to start an app result in node shutdown
709709
{badrpc, nodedown} -> nodedown;
710710
{badrpc_multi, Reason, _Nodes} -> Reason;
711-
Other -> exit({expected_failure, Other})
711+
Other -> error({expected_failure, Other})
712712
end.
713713

714714
stop_app(Node) ->

0 commit comments

Comments
 (0)