Skip to content

Commit 9dc09c6

Browse files
author
Emile Joubert
committed
Merged bug24885 into default
2 parents b252e9b + 1ba7b54 commit 9dc09c6

File tree

7 files changed

+83
-54
lines changed

7 files changed

+83
-54
lines changed

packaging/common/rabbitmq-server.init

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ test -x $CONTROL || exit 0
3535
RETVAL=0
3636
set -e
3737

38+
[ -f /etc/default/${NAME} ] && . /etc/default/${NAME}
39+
3840
ensure_pid_dir () {
3941
PID_DIR=`dirname ${PID_FILE}`
4042
if [ ! -d ${PID_DIR} ] ; then
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# This file is sourced by /etc/init.d/rabbitmq-server. Its primary
2+
# reason for existing is to allow adjustment of system limits for the
3+
# rabbitmq-server process.
4+
#
5+
# Maximum number of open file handles. This will need to be increased
6+
# to handle many simultaneous connections. Refer to the system
7+
# documentation for ulimit (in man bash) for more information.
8+
#
9+
#ulimit -n 1024

packaging/debs/Debian/debian/rules

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ install/rabbitmq-server::
1919
done
2020
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
2121
install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
22+
install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server

src/rabbit_amqqueue_process.erl

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -723,38 +723,39 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
723723
ensure_ttl_timer(State) ->
724724
State.
725725

726+
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
727+
backing_queue = BQ,
728+
backing_queue_state = BQS }) ->
729+
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
730+
State#q{backing_queue_state = BQS1};
731+
ack_if_no_dlx(_AckTags, State) ->
732+
State.
733+
726734
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
727735
undefined;
728736
dead_letter_fun(Reason, _State) ->
729737
fun(Msg, AckTag) ->
730738
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
731739
end.
732740

733-
dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
734-
case rabbit_exchange:lookup(DLX) of
735-
{error, not_found} -> noreply(State);
736-
_ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
737-
State)
741+
dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
742+
DLMsg = #basic_message{exchange_name = XName} =
743+
make_dead_letter_msg(Reason, Msg, State),
744+
case rabbit_exchange:lookup(XName) of
745+
{ok, X} ->
746+
Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
747+
{Queues, Cycles} = detect_dead_letter_cycles(
748+
DLMsg, rabbit_exchange:route(X, Delivery)),
749+
lists:foreach(fun log_cycle_once/1, Cycles),
750+
QPids = rabbit_amqqueue:lookup(Queues),
751+
{_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
752+
DeliveredQPids;
753+
{error, not_found} ->
754+
[]
738755
end.
739756

740-
dead_letter_publish(Msg, Reason,
741-
State = #q{publish_seqno = MsgSeqNo,
742-
dlx = DLX}) ->
743-
Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
744-
rabbit_basic:delivery(
745-
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
746-
MsgSeqNo),
747-
{ok, X} = rabbit_exchange:lookup(XName),
748-
Queues = rabbit_exchange:route(X, Delivery),
749-
{Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues),
750-
lists:foreach(fun log_cycle_once/1, Cycles),
751-
QPids = rabbit_amqqueue:lookup(Queues1),
752-
{_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
753-
DeliveredQPids.
754-
755-
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
756-
State = #q{publish_seqno = MsgSeqNo,
757-
unconfirmed = UC}) ->
757+
dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
758+
unconfirmed = UC}) ->
758759
QPids = dead_letter_publish(Msg, Reason, State),
759760
State1 = State#q{queue_monitors = pmon:monitor_all(
760761
QPids, State#q.queue_monitors),
@@ -813,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
813814
false -> noreply(State1)
814815
end.
815816

816-
detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
817-
Queues) ->
817+
detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
818818
#content{properties = #'P_basic'{headers = Headers}} =
819819
rabbit_binary_parser:ensure_content_decoded(Content),
820820
NoCycles = {Queues, []},
@@ -841,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}
841841
end
842842
end.
843843

844-
make_dead_letter_msg(DLX, Reason,
844+
make_dead_letter_msg(Reason,
845845
Msg = #basic_message{content = Content,
846846
exchange_name = Exchange,
847847
routing_keys = RoutingKeys},
848-
State = #q{dlx_routing_key = DlxRoutingKey}) ->
848+
State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
849849
{DeathRoutingKeys, HeadersFun1} =
850850
case DlxRoutingKey of
851851
undefined -> {RoutingKeys, fun (H) -> H end};
852852
_ -> {[DlxRoutingKey],
853853
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
854854
end,
855+
ReasonBin = list_to_binary(atom_to_list(Reason)),
855856
#resource{name = QName} = qname(State),
857+
TimeSec = rabbit_misc:now_ms() div 1000,
856858
HeadersFun2 =
857859
fun (Headers) ->
858860
%% The first routing key is the one specified in the
859861
%% basic.publish; all others are CC or BCC keys.
860-
RoutingKeys1 =
861-
[hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
862-
Info = [{<<"reason">>,
863-
longstr, list_to_binary(atom_to_list(Reason))},
864-
{<<"queue">>, longstr, QName},
865-
{<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
866-
{<<"exchange">>, longstr, Exchange#resource.name},
867-
{<<"routing-keys">>, array,
868-
[{longstr, Key} || Key <- RoutingKeys1]}],
862+
RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
863+
RKs1 = [{longstr, Key} || Key <- RKs],
864+
Info = [{<<"reason">>, longstr, ReasonBin},
865+
{<<"queue">>, longstr, QName},
866+
{<<"time">>, timestamp, TimeSec},
867+
{<<"exchange">>, longstr, Exchange#resource.name},
868+
{<<"routing-keys">>, array, RKs1}],
869869
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
870870
Info, Headers))
871871
end,
@@ -1240,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
12401240
ChPid, AckTags, State,
12411241
case Requeue of
12421242
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
1243-
false -> Fun = dead_letter_fun(rejected, State),
1244-
fun (State1 = #q{backing_queue = BQ,
1243+
false -> fun (State1 = #q{backing_queue = BQ,
12451244
backing_queue_state = BQS}) ->
1245+
Fun = dead_letter_fun(rejected, State1),
12461246
BQS1 = BQ:fold(Fun, BQS, AckTags),
1247-
State1#q{backing_queue_state = BQS1}
1247+
ack_if_no_dlx(
1248+
AckTags,
1249+
State1#q{backing_queue_state = BQS1})
12481250
end
12491251
end));
12501252

src/rabbit_basic.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
6565

66-
-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
66+
-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
6767
-> rabbit_types:content()).
6868

6969
-spec(header_routes/1 ::

src/rabbit_channel.erl

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
conn_name, limiter, tx_status, next_tag, unacked_message_q,
3737
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
3838
virtual_host, most_recently_declared_queue, queue_monitors,
39-
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
40-
stats_timer, confirm_enabled, publish_seqno, unconfirmed,
41-
confirmed, capabilities, trace_state}).
39+
consumer_mapping, blocking, queue_consumers, delivering_queues,
40+
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
41+
unconfirmed, confirmed, capabilities, trace_state}).
4242

4343
-define(MAX_PERMISSION_CACHE_SIZE, 12).
4444

@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
198198
consumer_mapping = dict:new(),
199199
blocking = sets:new(),
200200
queue_consumers = dict:new(),
201+
delivering_queues = sets:new(),
201202
queue_collector_pid = CollectorPid,
202203
confirm_enabled = false,
203204
publish_seqno = 1,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
331332
State1 = handle_publishing_queue_down(QPid, Reason, State),
332333
State2 = queue_blocked(QPid, State1),
333334
State3 = handle_consuming_queue_down(QPid, State2),
335+
State4 = handle_delivering_queue_down(QPid, State3),
334336
credit_flow:peer_down(QPid),
335337
erase_queue_stats(QPid),
336338
noreply(State3#ch{queue_monitors = pmon:erase(
337-
QPid, State3#ch.queue_monitors)});
339+
QPid, State4#ch.queue_monitors)});
338340

339341
handle_info({'EXIT', _Pid, Reason}, State) ->
340342
{stop, Reason, State}.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
657659
QueueName, ConnPid,
658660
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
659661
{ok, MessageCount,
660-
Msg = {_QName, _QPid, _MsgId, Redelivered,
662+
Msg = {_QName, QPid, _MsgId, Redelivered,
661663
#basic_message{exchange_name = ExchangeName,
662664
routing_keys = [RoutingKey | _CcRoutes],
663665
content = Content}}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
669671
routing_key = RoutingKey,
670672
message_count = MessageCount},
671673
Content),
672-
{noreply, record_sent(none, not(NoAck), Msg, State)};
674+
State1 = monitor_delivering_queue(NoAck, QPid, State),
675+
{noreply, record_sent(none, not(NoAck), Msg, State1)};
673676
empty ->
674677
{reply, #'basic.get_empty'{}, State}
675678
end;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
707710
consumer_tag = ActualConsumerTag})),
708711
Q}
709712
end) of
710-
{ok, Q} ->
711-
State1 = State#ch{consumer_mapping =
712-
dict:store(ActualConsumerTag, Q,
713-
ConsumerMapping)},
713+
{ok, Q = #amqqueue{pid = QPid}} ->
714+
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
715+
State1 = monitor_delivering_queue(
716+
NoAck, QPid, State#ch{consumer_mapping = CM1}),
714717
{noreply,
715718
case NoWait of
716719
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
11081111
State
11091112
end.
11101113

1114+
monitor_delivering_queue(true, _QPid, State) ->
1115+
State;
1116+
monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
1117+
delivering_queues = DQ}) ->
1118+
State#ch{queue_monitors = pmon:monitor(QPid, QMons),
1119+
delivering_queues = sets:add_element(QPid, DQ)}.
1120+
11111121
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
11121122
case rabbit_misc:is_abnormal_termination(Reason) of
11131123
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
11341144
State#ch{consumer_mapping = ConsumerMapping1,
11351145
queue_consumers = dict:erase(QPid, QCons)}.
11361146

1147+
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
1148+
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
1149+
11371150
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
11381151
RoutingKey, Arguments, ReturnMethod, NoWait,
11391152
State = #ch{virtual_host = VHostPath,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
12691282

12701283
notify_queues(State = #ch{state = closing}) ->
12711284
{ok, State};
1272-
notify_queues(State = #ch{consumer_mapping = Consumers}) ->
1273-
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
1274-
State#ch{state = closing}}.
1285+
notify_queues(State = #ch{consumer_mapping = Consumers,
1286+
delivering_queues = DQ }) ->
1287+
QPids = sets:to_list(
1288+
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
1289+
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
12751290

12761291
fold_per_queue(_F, Acc, []) ->
12771292
Acc;

src/rabbit_mirror_queue_coordinator.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
356356
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
357357
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
358358

359-
handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
359+
handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
360360
gm:broadcast(GM, heartbeat),
361361
ensure_gm_heartbeat(),
362362
noreply(State);

0 commit comments

Comments
 (0)