Skip to content

Commit 7b094e7

Browse files
author
Simon MacMullen
committed
Merge bug24848
2 parents cef3c28 + 9dc09c6 commit 7b094e7

13 files changed

+159
-116
lines changed

packaging/common/rabbitmq-server.init

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ test -x $CONTROL || exit 0
3535
RETVAL=0
3636
set -e
3737

38+
[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}
39+
3840
ensure_pid_dir () {
3941
PID_DIR=`dirname ${PID_FILE}`
4042
if [ ! -d ${PID_DIR} ] ; then
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# This file is sourced by /etc/init.d/rabbitmq-server. Its primary
2+
# reason for existing is to allow adjustment of system limits for the
3+
# rabbitmq-server process.
4+
#
5+
# Maximum number of open file handles. This will need to be increased
6+
# to handle many simultaneous connections. Refer to the system
7+
# documentation for ulimit (in man bash) for more information.
8+
#
9+
#ulimit -n 1024

packaging/debs/Debian/debian/rules

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ install/rabbitmq-server::
1919
done
2020
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
2121
install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
22+
install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server

src/rabbit_amqqueue_process.erl

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
696696
drop_expired_messages(State = #q{ttl = undefined}) ->
697697
State;
698698
drop_expired_messages(State = #q{backing_queue_state = BQS,
699-
backing_queue = BQ}) ->
699+
backing_queue = BQ }) ->
700700
Now = now_micros(),
701-
BQS1 = BQ:dropwhile(
702-
fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
703-
dead_letter_fun(expired, State),
704-
BQS),
701+
DLXFun = dead_letter_fun(expired, State),
702+
ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
703+
case DLXFun of
704+
undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
705+
BQS1;
706+
_ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
707+
lists:foreach(
708+
fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
709+
BQS1
710+
end,
705711
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
706712

707713
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -717,29 +723,40 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
717723
ensure_ttl_timer(State) ->
718724
State.
719725

726+
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
727+
backing_queue = BQ,
728+
backing_queue_state = BQS }) ->
729+
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
730+
State#q{backing_queue_state = BQS1};
731+
ack_if_no_dlx(_AckTags, State) ->
732+
State.
733+
720734
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
721735
undefined;
722736
dead_letter_fun(Reason, _State) ->
723737
fun(Msg, AckTag) ->
724738
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
725739
end.
726740

727-
dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
728-
case rabbit_exchange:lookup(DLX) of
729-
{error, not_found} -> noreply(State);
730-
_ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
731-
State)
741+
dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
742+
DLMsg = #basic_message{exchange_name = XName} =
743+
make_dead_letter_msg(Reason, Msg, State),
744+
case rabbit_exchange:lookup(XName) of
745+
{ok, X} ->
746+
Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
747+
{Queues, Cycles} = detect_dead_letter_cycles(
748+
DLMsg, rabbit_exchange:route(X, Delivery)),
749+
lists:foreach(fun log_cycle_once/1, Cycles),
750+
QPids = rabbit_amqqueue:lookup(Queues),
751+
{_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
752+
DeliveredQPids;
753+
{error, not_found} ->
754+
[]
732755
end.
733756

734-
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
735-
State = #q{publish_seqno = MsgSeqNo,
736-
unconfirmed = UC,
737-
dlx = DLX}) ->
738-
{ok, _, QPids} =
739-
rabbit_basic:publish(
740-
rabbit_basic:delivery(
741-
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
742-
MsgSeqNo)),
757+
dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
758+
unconfirmed = UC}) ->
759+
QPids = dead_letter_publish(Msg, Reason, State),
743760
State1 = State#q{queue_monitors = pmon:monitor_all(
744761
QPids, State#q.queue_monitors),
745762
publish_seqno = MsgSeqNo + 1},
@@ -797,56 +814,58 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
797814
false -> noreply(State1)
798815
end.
799816

800-
already_been_here(_Delivery, #q{dlx = undefined}) ->
801-
false;
802-
already_been_here(#delivery{message = #basic_message{content = Content}},
803-
State) ->
817+
detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
804818
#content{properties = #'P_basic'{headers = Headers}} =
805819
rabbit_binary_parser:ensure_content_decoded(Content),
806-
#resource{name = QueueName} = qname(State),
820+
NoCycles = {Queues, []},
807821
case Headers of
808822
undefined ->
809-
false;
823+
NoCycles;
810824
_ ->
811825
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
812826
{array, DeathTables} ->
813827
OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
814828
{table, D} <- DeathTables],
815829
OldQueues1 = [QName || {longstr, QName} <- OldQueues],
816-
case lists:member(QueueName, OldQueues1) of
817-
true -> [QueueName | OldQueues1];
818-
_ -> false
819-
end;
830+
OldQueuesSet = ordsets:from_list(OldQueues1),
831+
{Cycling, NotCycling} =
832+
lists:partition(
833+
fun(Queue) ->
834+
ordsets:is_element(Queue#resource.name,
835+
OldQueuesSet)
836+
end, Queues),
837+
{NotCycling, [[QName | OldQueues1] ||
838+
#resource{name = QName} <- Cycling]};
820839
_ ->
821-
false
840+
NoCycles
822841
end
823842
end.
824843

825-
make_dead_letter_msg(DLX, Reason,
844+
make_dead_letter_msg(Reason,
826845
Msg = #basic_message{content = Content,
827846
exchange_name = Exchange,
828847
routing_keys = RoutingKeys},
829-
State = #q{dlx_routing_key = DlxRoutingKey}) ->
848+
State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
830849
{DeathRoutingKeys, HeadersFun1} =
831850
case DlxRoutingKey of
832851
undefined -> {RoutingKeys, fun (H) -> H end};
833852
_ -> {[DlxRoutingKey],
834853
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
835854
end,
855+
ReasonBin = list_to_binary(atom_to_list(Reason)),
836856
#resource{name = QName} = qname(State),
857+
TimeSec = rabbit_misc:now_ms() div 1000,
837858
HeadersFun2 =
838859
fun (Headers) ->
839860
%% The first routing key is the one specified in the
840861
%% basic.publish; all others are CC or BCC keys.
841-
RoutingKeys1 =
842-
[hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
843-
Info = [{<<"reason">>,
844-
longstr, list_to_binary(atom_to_list(Reason))},
845-
{<<"queue">>, longstr, QName},
846-
{<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
847-
{<<"exchange">>, longstr, Exchange#resource.name},
848-
{<<"routing-keys">>, array,
849-
[{longstr, Key} || Key <- RoutingKeys1]}],
862+
RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
863+
RKs1 = [{longstr, Key} || Key <- RKs],
864+
Info = [{<<"reason">>, longstr, ReasonBin},
865+
{<<"queue">>, longstr, QName},
866+
{<<"time">>, timestamp, TimeSec},
867+
{<<"exchange">>, longstr, Exchange#resource.name},
868+
{<<"routing-keys">>, array, RKs1}],
850869
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
851870
Info, Headers))
852871
end,
@@ -1196,8 +1215,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
11961215
handle_cast({run_backing_queue, Mod, Fun}, State) ->
11971216
noreply(run_backing_queue(Mod, Fun, State));
11981217

1199-
handle_cast({deliver, Delivery = #delivery{sender = Sender,
1200-
msg_seq_no = MsgSeqNo}, Flow},
1218+
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
12011219
State = #q{senders = Senders}) ->
12021220
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
12031221
Senders1 = case Flow of
@@ -1206,12 +1224,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
12061224
noflow -> Senders
12071225
end,
12081226
State1 = State#q{senders = Senders1},
1209-
case already_been_here(Delivery, State1) of
1210-
false -> noreply(deliver_or_enqueue(Delivery, State1));
1211-
Qs -> log_cycle_once(Qs),
1212-
rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
1213-
noreply(State1)
1214-
end;
1227+
noreply(deliver_or_enqueue(Delivery, State1));
12151228

12161229
handle_cast({ack, AckTags, ChPid}, State) ->
12171230
noreply(subtract_acks(
@@ -1227,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
12271240
ChPid, AckTags, State,
12281241
case Requeue of
12291242
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
1230-
false -> Fun = dead_letter_fun(rejected, State),
1231-
fun (State1 = #q{backing_queue = BQ,
1243+
false -> fun (State1 = #q{backing_queue = BQ,
12321244
backing_queue_state = BQS}) ->
1245+
Fun = dead_letter_fun(rejected, State1),
12331246
BQS1 = BQ:fold(Fun, BQS, AckTags),
1234-
State1#q{backing_queue_state = BQS1}
1247+
ack_if_no_dlx(
1248+
AckTags,
1249+
State1#q{backing_queue_state = BQS1})
12351250
end
12361251
end));
12371252

src/rabbit_backing_queue.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
3737
'undefined').
38+
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
3839

3940
%% Called on startup with a list of durable queue names. The queues
4041
%% aren't being started at this point, but this call allows the
@@ -117,12 +118,14 @@
117118
%% be ignored.
118119
-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
119120

120-
%% Drop messages from the head of the queue while the supplied
121-
%% predicate returns true. A callback function is supplied allowing
122-
%% callers access to messages that are about to be dropped.
123-
-callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
124-
state())
125-
-> state().
121+
%% Drop messages from the head of the queue while the supplied predicate returns
122+
%% true. Also accepts a boolean parameter that determines whether the messages
123+
%% necessitate an ack or not. If they do, the function returns a list of
124+
%% messages with the respective acktags.
125+
-callback dropwhile(msg_pred(), true, state())
126+
-> {[{rabbit_types:basic_message(), ack()}], state()};
127+
(msg_pred(), false, state())
128+
-> {undefined, state()}.
126129

127130
%% Produce the next message.
128131
-callback fetch(true, state()) -> {fetch_result(ack()), state()};

src/rabbit_backing_queue_qc.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
141141
{call, ?BQMOD, drain_confirmed, [BQ]}.
142142

143143
qc_dropwhile(#state{bqstate = BQ}) ->
144-
{call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
144+
{call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
145145

146146
qc_is_empty(#state{bqstate = BQ}) ->
147147
{call, ?BQMOD, is_empty, [BQ]}.
@@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
267267
BQ1 = {call, erlang, element, [2, Res]},
268268
S#state{bqstate = BQ1};
269269

270-
next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
270+
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
271+
BQ = {call, erlang, element, [2, Res]},
271272
#state{messages = Messages} = S,
272273
Msgs1 = drop_messages(Messages),
273-
S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
274+
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
274275

275276
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
276277
S;

src/rabbit_basic.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
6565

66-
-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
66+
-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
6767
-> rabbit_types:content()).
6868

6969
-spec(header_routes/1 ::

src/rabbit_channel.erl

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
conn_name, limiter, tx_status, next_tag, unacked_message_q,
3737
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
3838
virtual_host, most_recently_declared_queue, queue_monitors,
39-
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
40-
stats_timer, confirm_enabled, publish_seqno, unconfirmed,
41-
confirmed, capabilities, trace_state}).
39+
consumer_mapping, blocking, queue_consumers, delivering_queues,
40+
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
41+
unconfirmed, confirmed, capabilities, trace_state}).
4242

4343
-define(MAX_PERMISSION_CACHE_SIZE, 12).
4444

@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
198198
consumer_mapping = dict:new(),
199199
blocking = sets:new(),
200200
queue_consumers = dict:new(),
201+
delivering_queues = sets:new(),
201202
queue_collector_pid = CollectorPid,
202203
confirm_enabled = false,
203204
publish_seqno = 1,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
331332
State1 = handle_publishing_queue_down(QPid, Reason, State),
332333
State2 = queue_blocked(QPid, State1),
333334
State3 = handle_consuming_queue_down(QPid, State2),
335+
State4 = handle_delivering_queue_down(QPid, State3),
334336
credit_flow:peer_down(QPid),
335337
erase_queue_stats(QPid),
336338
noreply(State3#ch{queue_monitors = pmon:erase(
337-
QPid, State3#ch.queue_monitors)});
339+
QPid, State4#ch.queue_monitors)});
338340

339341
handle_info({'EXIT', _Pid, Reason}, State) ->
340342
{stop, Reason, State}.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
657659
QueueName, ConnPid,
658660
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
659661
{ok, MessageCount,
660-
Msg = {_QName, _QPid, _MsgId, Redelivered,
662+
Msg = {_QName, QPid, _MsgId, Redelivered,
661663
#basic_message{exchange_name = ExchangeName,
662664
routing_keys = [RoutingKey | _CcRoutes],
663665
content = Content}}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
669671
routing_key = RoutingKey,
670672
message_count = MessageCount},
671673
Content),
672-
{noreply, record_sent(none, not(NoAck), Msg, State)};
674+
State1 = monitor_delivering_queue(NoAck, QPid, State),
675+
{noreply, record_sent(none, not(NoAck), Msg, State1)};
673676
empty ->
674677
{reply, #'basic.get_empty'{}, State}
675678
end;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
707710
consumer_tag = ActualConsumerTag})),
708711
Q}
709712
end) of
710-
{ok, Q} ->
711-
State1 = State#ch{consumer_mapping =
712-
dict:store(ActualConsumerTag, Q,
713-
ConsumerMapping)},
713+
{ok, Q = #amqqueue{pid = QPid}} ->
714+
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
715+
State1 = monitor_delivering_queue(
716+
NoAck, QPid, State#ch{consumer_mapping = CM1}),
714717
{noreply,
715718
case NoWait of
716719
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
11081111
State
11091112
end.
11101113

1114+
monitor_delivering_queue(true, _QPid, State) ->
1115+
State;
1116+
monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
1117+
delivering_queues = DQ}) ->
1118+
State#ch{queue_monitors = pmon:monitor(QPid, QMons),
1119+
delivering_queues = sets:add_element(QPid, DQ)}.
1120+
11111121
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
11121122
case rabbit_misc:is_abnormal_termination(Reason) of
11131123
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
11341144
State#ch{consumer_mapping = ConsumerMapping1,
11351145
queue_consumers = dict:erase(QPid, QCons)}.
11361146

1147+
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
1148+
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
1149+
11371150
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
11381151
RoutingKey, Arguments, ReturnMethod, NoWait,
11391152
State = #ch{virtual_host = VHostPath,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
12691282

12701283
notify_queues(State = #ch{state = closing}) ->
12711284
{ok, State};
1272-
notify_queues(State = #ch{consumer_mapping = Consumers}) ->
1273-
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
1274-
State#ch{state = closing}}.
1285+
notify_queues(State = #ch{consumer_mapping = Consumers,
1286+
delivering_queues = DQ }) ->
1287+
QPids = sets:to_list(
1288+
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
1289+
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
12751290

12761291
fold_per_queue(_F, Acc, []) ->
12771292
Acc;

0 commit comments

Comments
 (0)