3636 conn_name , limiter , tx_status , next_tag , unacked_message_q ,
3737 uncommitted_message_q , uncommitted_acks , uncommitted_nacks , user ,
3838 virtual_host , most_recently_declared_queue , queue_monitors ,
39- consumer_mapping , blocking , queue_consumers , queue_collector_pid ,
40- stats_timer , confirm_enabled , publish_seqno , unconfirmed ,
41- confirmed , capabilities , trace_state }).
39+ consumer_mapping , blocking , queue_consumers , delivering_queues ,
40+ queue_collector_pid , stats_timer , confirm_enabled , publish_seqno ,
41+ unconfirmed , confirmed , capabilities , trace_state }).
4242
4343-define (MAX_PERMISSION_CACHE_SIZE , 12 ).
4444
@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
198198 consumer_mapping = dict :new (),
199199 blocking = sets :new (),
200200 queue_consumers = dict :new (),
201+ delivering_queues = sets :new (),
201202 queue_collector_pid = CollectorPid ,
202203 confirm_enabled = false ,
203204 publish_seqno = 1 ,
@@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
331332 State1 = handle_publishing_queue_down (QPid , Reason , State ),
332333 State2 = queue_blocked (QPid , State1 ),
333334 State3 = handle_consuming_queue_down (QPid , State2 ),
335+ State4 = handle_delivering_queue_down (QPid , State3 ),
334336 credit_flow :peer_down (QPid ),
335337 erase_queue_stats (QPid ),
336338 noreply (State3 # ch {queue_monitors = pmon :erase (
337- QPid , State3 # ch .queue_monitors )});
339+ QPid , State4 # ch .queue_monitors )});
338340
339341handle_info ({'EXIT' , _Pid , Reason }, State ) ->
340342 {stop , Reason , State }.
@@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
657659 QueueName , ConnPid ,
658660 fun (Q ) -> rabbit_amqqueue :basic_get (Q , self (), NoAck ) end ) of
659661 {ok , MessageCount ,
660- Msg = {_QName , _QPid , _MsgId , Redelivered ,
662+ Msg = {_QName , QPid , _MsgId , Redelivered ,
661663 # basic_message {exchange_name = ExchangeName ,
662664 routing_keys = [RoutingKey | _CcRoutes ],
663665 content = Content }}} ->
@@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
669671 routing_key = RoutingKey ,
670672 message_count = MessageCount },
671673 Content ),
672- {noreply , record_sent (none , not (NoAck ), Msg , State )};
674+ State1 = monitor_delivering_queue (NoAck , QPid , State ),
675+ {noreply , record_sent (none , not (NoAck ), Msg , State1 )};
673676 empty ->
674677 {reply , # 'basic.get_empty' {}, State }
675678 end ;
@@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
707710 consumer_tag = ActualConsumerTag })),
708711 Q }
709712 end ) of
710- {ok , Q } ->
711- State1 = State # ch { consumer_mapping =
712- dict : store ( ActualConsumerTag , Q ,
713- ConsumerMapping )} ,
713+ {ok , Q = # amqqueue { pid = QPid } } ->
714+ CM1 = dict : store ( ActualConsumerTag , Q , ConsumerMapping ),
715+ State1 = monitor_delivering_queue (
716+ NoAck , QPid , State # ch { consumer_mapping = CM1 }) ,
714717 {noreply ,
715718 case NoWait of
716719 true -> consumer_monitor (ActualConsumerTag , State1 );
@@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag,
11081111 State
11091112 end .
11101113
1114+ monitor_delivering_queue (true , _QPid , State ) ->
1115+ State ;
1116+ monitor_delivering_queue (false , QPid , State = # ch {queue_monitors = QMons ,
1117+ delivering_queues = DQ }) ->
1118+ State # ch {queue_monitors = pmon :monitor (QPid , QMons ),
1119+ delivering_queues = sets :add_element (QPid , DQ )}.
1120+
11111121handle_publishing_queue_down (QPid , Reason , State = # ch {unconfirmed = UC }) ->
11121122 case rabbit_misc :is_abnormal_termination (Reason ) of
11131123 true -> {MXs , UC1 } = dtree :take_all (QPid , UC ),
@@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid,
11341144 State # ch {consumer_mapping = ConsumerMapping1 ,
11351145 queue_consumers = dict :erase (QPid , QCons )}.
11361146
1147+ handle_delivering_queue_down (QPid , State = # ch {delivering_queues = DQ }) ->
1148+ State # ch {delivering_queues = sets :del_element (QPid , DQ )}.
1149+
11371150binding_action (Fun , ExchangeNameBin , DestinationType , DestinationNameBin ,
11381151 RoutingKey , Arguments , ReturnMethod , NoWait ,
11391152 State = # ch {virtual_host = VHostPath ,
@@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
12691282
12701283notify_queues (State = # ch {state = closing }) ->
12711284 {ok , State };
1272- notify_queues (State = # ch {consumer_mapping = Consumers }) ->
1273- {rabbit_amqqueue :notify_down_all (consumer_queues (Consumers ), self ()),
1274- State # ch {state = closing }}.
1285+ notify_queues (State = # ch {consumer_mapping = Consumers ,
1286+ delivering_queues = DQ }) ->
1287+ QPids = sets :to_list (
1288+ sets :union (sets :from_list (consumer_queues (Consumers )), DQ )),
1289+ {rabbit_amqqueue :notify_down_all (QPids , self ()), State # ch {state = closing }}.
12751290
12761291fold_per_queue (_F , Acc , []) ->
12771292 Acc ;
0 commit comments