Skip to content

Commit 15a4cbf

Browse files
author
Simon MacMullen
committed
Merge bug24860
2 parents 051c149 + 5a76993 commit 15a4cbf

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

src/rabbit_channel.erl

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
conn_name, limiter, tx_status, next_tag, unacked_message_q,
3737
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
3838
virtual_host, most_recently_declared_queue, queue_monitors,
39-
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
40-
stats_timer, confirm_enabled, publish_seqno, unconfirmed,
41-
confirmed, capabilities, trace_state}).
39+
consumer_mapping, blocking, queue_consumers, delivering_queues,
40+
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
41+
unconfirmed, confirmed, capabilities, trace_state}).
4242

4343
-define(MAX_PERMISSION_CACHE_SIZE, 12).
4444

@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
198198
consumer_mapping = dict:new(),
199199
blocking = sets:new(),
200200
queue_consumers = dict:new(),
201+
delivering_queues = sets:new(),
201202
queue_collector_pid = CollectorPid,
202203
confirm_enabled = false,
203204
publish_seqno = 1,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
331332
State1 = handle_publishing_queue_down(QPid, Reason, State),
332333
State2 = queue_blocked(QPid, State1),
333334
State3 = handle_consuming_queue_down(QPid, State2),
335+
State4 = handle_delivering_queue_down(QPid, State3),
334336
credit_flow:peer_down(QPid),
335337
erase_queue_stats(QPid),
336338
noreply(State3#ch{queue_monitors = pmon:erase(
337-
QPid, State3#ch.queue_monitors)});
339+
QPid, State4#ch.queue_monitors)});
338340

339341
handle_info({'EXIT', _Pid, Reason}, State) ->
340342
{stop, Reason, State}.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
657659
QueueName, ConnPid,
658660
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
659661
{ok, MessageCount,
660-
Msg = {_QName, _QPid, _MsgId, Redelivered,
662+
Msg = {_QName, QPid, _MsgId, Redelivered,
661663
#basic_message{exchange_name = ExchangeName,
662664
routing_keys = [RoutingKey | _CcRoutes],
663665
content = Content}}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
669671
routing_key = RoutingKey,
670672
message_count = MessageCount},
671673
Content),
672-
{noreply, record_sent(none, not(NoAck), Msg, State)};
674+
State1 = monitor_delivering_queue(NoAck, QPid, State),
675+
{noreply, record_sent(none, not(NoAck), Msg, State1)};
673676
empty ->
674677
{reply, #'basic.get_empty'{}, State}
675678
end;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
707710
consumer_tag = ActualConsumerTag})),
708711
Q}
709712
end) of
710-
{ok, Q} ->
711-
State1 = State#ch{consumer_mapping =
712-
dict:store(ActualConsumerTag, Q,
713-
ConsumerMapping)},
713+
{ok, Q = #amqqueue{pid = QPid}} ->
714+
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
715+
State1 = monitor_delivering_queue(
716+
NoAck, QPid, State#ch{consumer_mapping = CM1}),
714717
{noreply,
715718
case NoWait of
716719
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
11081111
State
11091112
end.
11101113

1114+
monitor_delivering_queue(true, _QPid, State) ->
1115+
State;
1116+
monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
1117+
delivering_queues = DQ}) ->
1118+
State#ch{queue_monitors = pmon:monitor(QPid, QMons),
1119+
delivering_queues = sets:add_element(QPid, DQ)}.
1120+
11111121
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
11121122
case rabbit_misc:is_abnormal_termination(Reason) of
11131123
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
11341144
State#ch{consumer_mapping = ConsumerMapping1,
11351145
queue_consumers = dict:erase(QPid, QCons)}.
11361146

1147+
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
1148+
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
1149+
11371150
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
11381151
RoutingKey, Arguments, ReturnMethod, NoWait,
11391152
State = #ch{virtual_host = VHostPath,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
12691282

12701283
notify_queues(State = #ch{state = closing}) ->
12711284
{ok, State};
1272-
notify_queues(State = #ch{consumer_mapping = Consumers}) ->
1273-
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
1274-
State#ch{state = closing}}.
1285+
notify_queues(State = #ch{consumer_mapping = Consumers,
1286+
delivering_queues = DQ }) ->
1287+
QPids = sets:to_list(
1288+
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
1289+
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
12751290

12761291
fold_per_queue(_F, Acc, []) ->
12771292
Acc;

0 commit comments

Comments
 (0)