5353 has_had_consumers ,
5454 next_msg_id ,
5555 message_buffer ,
56- round_robin }).
56+ round_robin ,
57+ blocked_consumers }).
5758
5859-record (consumer , {tag , ack_required }).
5960
6061-record (tx , {ch_pid , is_persistent , pending_messages , pending_acks }).
6162
6263% % These are held in our process dictionary
63- -record (cr , {consumers ,
64+ -record (cr , {consumer_count ,
6465 ch_pid ,
6566 limiter_pid ,
6667 monitor_ref ,
@@ -99,7 +100,8 @@ init(Q) ->
99100 has_had_consumers = false ,
100101 next_msg_id = 1 ,
101102 message_buffer = queue :new (),
102- round_robin = queue :new ()}, ? HIBERNATE_AFTER }.
103+ round_robin = queue :new (),
104+ blocked_consumers = queue :new ()}, ? HIBERNATE_AFTER }.
103105
104106terminate (_Reason , State ) ->
105107 % % FIXME: How do we cancel active subscriptions?
@@ -129,7 +131,7 @@ ch_record(ChPid) ->
129131 case get (Key ) of
130132 undefined ->
131133 MonitorRef = erlang :monitor (process , ChPid ),
132- C = # cr {consumers = [] ,
134+ C = # cr {consumer_count = 0 ,
133135 ch_pid = ChPid ,
134136 monitor_ref = MonitorRef ,
135137 unacked_messages = dict :new (),
@@ -166,6 +168,7 @@ record_current_channel_tx(ChPid, Txn) ->
166168deliver_immediately (Message , Delivered ,
167169 State = # q {q = # amqqueue {name = QName },
168170 round_robin = RoundRobin ,
171+ blocked_consumers = BlockedConsumers ,
169172 next_msg_id = NextId }) ->
170173 ? LOGDEBUG (" AMQQUEUE ~p DELIVERY:~n~p~n " , [QName , Message ]),
171174 case queue :out (RoundRobin ) of
@@ -187,18 +190,30 @@ deliver_immediately(Message, Delivered,
187190 NewC = C # cr {unsent_message_count = Count + 1 ,
188191 unacked_messages = NewUAM },
189192 store_ch_record (NewC ),
190- NewConsumers =
193+ { NewRoundRobin , NewBlockedConsumers } =
191194 case ch_record_state_transition (C , NewC ) of
192- ok -> queue :in (QEntry , RoundRobinTail );
193- block -> block_consumers (ChPid , RoundRobinTail )
195+ ok -> {queue :in (QEntry , RoundRobinTail ),
196+ BlockedConsumers };
197+ block ->
198+ {RoundRobin1 , BlockedConsumers1 } =
199+ move_consumers (ChPid ,
200+ RoundRobinTail ,
201+ BlockedConsumers ),
202+ {RoundRobin1 ,
203+ queue :in (QEntry , BlockedConsumers1 )}
194204 end ,
195- {offered , AckRequired , State # q {round_robin = NewConsumers ,
196- next_msg_id = NextId + 1 }};
205+ {offered , AckRequired ,
206+ State # q {round_robin = NewRoundRobin ,
207+ blocked_consumers = NewBlockedConsumers ,
208+ next_msg_id = NextId + 1 }};
197209 false ->
198210 store_ch_record (C # cr {is_limit_active = true }),
199- NewConsumers = block_consumers (ChPid , RoundRobinTail ),
200- deliver_immediately (Message , Delivered ,
201- State # q {round_robin = NewConsumers })
211+ {NewRoundRobin , NewBlockedConsumers } =
212+ move_consumers (ChPid , RoundRobin , BlockedConsumers ),
213+ deliver_immediately (
214+ Message , Delivered ,
215+ State # q {round_robin = NewRoundRobin ,
216+ blocked_consumers = NewBlockedConsumers })
202217 end ;
203218 {empty , _ } ->
204219 {not_offered , State }
@@ -234,22 +249,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
234249 run_poke_burst (queue :join (MessageBuffer , queue :from_list (Messages )),
235250 State ).
236251
237- block_consumers (ChPid , RoundRobin ) ->
238- % %?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
239- queue :from_list (lists :filter (fun ({CP , _ }) -> CP /= ChPid end ,
240- queue :to_list (RoundRobin ))).
241-
242- unblock_consumers (ChPid , Consumers , RoundRobin ) ->
243- % %?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]),
244- queue :join (RoundRobin ,
245- queue :from_list ([{ChPid , Con } || Con <- Consumers ])).
252+ add_consumer (ChPid , Consumer , Queue ) -> queue :in ({ChPid , Consumer }, Queue ).
246253
247- block_consumer (ChPid , ConsumerTag , RoundRobin ) ->
248- % %?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]),
254+ remove_consumer (ChPid , ConsumerTag , Queue ) ->
255+ % % TODO: replace this with queue:filter/2 once we move to R12
249256 queue :from_list (lists :filter (
250257 fun ({CP , # consumer {tag = CT }}) ->
251258 (CP /= ChPid ) or (CT /= ConsumerTag )
252- end , queue :to_list (RoundRobin ))).
259+ end , queue :to_list (Queue ))).
260+
261+ remove_consumers (ChPid , Queue ) ->
262+ % % TODO: replace this with queue:filter/2 once we move to R12
263+ queue :from_list (lists :filter (fun ({CP , _ }) -> CP /= ChPid end ,
264+ queue :to_list (Queue ))).
265+
266+ move_consumers (ChPid , From , To ) ->
267+ {Kept , Removed } = lists :partition (fun ({CP , _ }) -> CP /= ChPid end ,
268+ queue :to_list (From )),
269+ {queue :from_list (Kept ), queue :join (To , queue :from_list (Removed ))}.
253270
254271possibly_unblock (State , ChPid , Update ) ->
255272 case lookup_ch (ChPid ) of
@@ -260,71 +277,47 @@ possibly_unblock(State, ChPid, Update) ->
260277 store_ch_record (NewC ),
261278 case ch_record_state_transition (C , NewC ) of
262279 ok -> State ;
263- unblock -> NewRR = unblock_consumers (ChPid ,
264- NewC # cr .consumers ,
265- State # q .round_robin ),
266- run_poke_burst (State # q {round_robin = NewRR })
280+ unblock -> {NewBlockedeConsumers , NewRoundRobin } =
281+ move_consumers (ChPid ,
282+ State # q .blocked_consumers ,
283+ State # q .round_robin ),
284+ run_poke_burst (
285+ State # q {round_robin = NewRoundRobin ,
286+ blocked_consumers = NewBlockedeConsumers })
267287 end
268288 end .
269289
270- check_auto_delete (State = # q {q = # amqqueue {auto_delete = false }}) ->
271- {continue , State };
272- check_auto_delete (State = # q {has_had_consumers = false }) ->
273- {continue , State };
274- check_auto_delete (State = # q {round_robin = RoundRobin }) ->
275- % The clauses above rule out cases where no-one has consumed from
276- % this queue yet, and cases where we are not an auto_delete queue
277- % in any case. Thus it remains to check whether we have any active
278- % listeners at this point.
279- case queue :is_empty (RoundRobin ) of
280- true ->
281- % There are no waiting listeners. It's possible that we're
282- % completely unused. Check.
283- case is_unused () of
284- true ->
285- % There are no active consumers at this
286- % point. This is the signal to autodelete.
287- {stop , State };
288- false ->
289- % There is at least one active consumer, so we
290- % shouldn't delete ourselves.
291- {continue , State }
292- end ;
293- false ->
294- % There are some waiting listeners, thus we are not
295- % unused, so can continue life as normal without needing
296- % to check the process dictionary.
297- {continue , State }
298- end .
290+ should_auto_delete (# q {q = # amqqueue {auto_delete = false }}) -> false ;
291+ should_auto_delete (# q {has_had_consumers = false }) -> false ;
292+ should_auto_delete (State ) -> is_unused (State ).
299293
300- handle_ch_down (DownPid , State = # q {exclusive_consumer = Holder ,
301- round_robin = ActiveConsumers }) ->
294+ handle_ch_down (DownPid , State = # q {exclusive_consumer = Holder }) ->
302295 case lookup_ch (DownPid ) of
303296 not_found -> noreply (State );
304297 # cr {monitor_ref = MonitorRef , ch_pid = ChPid , txn = Txn ,
305298 unacked_messages = UAM } ->
306- NewActive = block_consumers (ChPid , ActiveConsumers ),
307299 erlang :demonitor (MonitorRef ),
308300 erase ({ch , ChPid }),
309301 case Txn of
310302 none -> ok ;
311303 _ -> ok = rollback_work (Txn , qname (State )),
312304 erase_tx (Txn )
313305 end ,
314- case check_auto_delete (
315- deliver_or_enqueue_n (
316- [{Message , true } ||
317- {_Messsage_id , Message } <- dict :to_list (UAM )],
318- State # q {
319- exclusive_consumer = case Holder of
320- {ChPid , _ } -> none ;
321- Other -> Other
322- end ,
323- round_robin = NewActive })) of
324- {continue , NewState } ->
325- noreply (NewState );
326- {stop , NewState } ->
327- {stop , normal , NewState }
306+ NewState =
307+ deliver_or_enqueue_n (
308+ [{Message , true } ||
309+ {_Messsage_id , Message } <- dict :to_list (UAM )],
310+ State # q {
311+ exclusive_consumer = case Holder of
312+ {ChPid , _ } -> none ;
313+ Other -> Other
314+ end ,
315+ round_robin = remove_consumers (ChPid , State # q .round_robin ),
316+ blocked_consumers = remove_consumers (
317+ ChPid , State # q .blocked_consumers )}),
318+ case should_auto_delete (NewState ) of
319+ false -> noreply (NewState );
320+ true -> {stop , normal , NewState }
328321 end
329322 end .
330323
@@ -337,12 +330,12 @@ check_queue_owner(none, _) -> ok;
337330check_queue_owner ({ReaderPid , _ }, ReaderPid ) -> ok ;
338331check_queue_owner ({_ , _ }, _ ) -> mismatch .
339332
340- check_exclusive_access ({_ChPid , _ConsumerTag }, _ExclusiveConsume ) ->
333+ check_exclusive_access ({_ChPid , _ConsumerTag }, _ExclusiveConsume , _State ) ->
341334 in_use ;
342- check_exclusive_access (none , false ) ->
335+ check_exclusive_access (none , false , _State ) ->
343336 ok ;
344- check_exclusive_access (none , true ) ->
345- case is_unused () of
337+ check_exclusive_access (none , true , State ) ->
338+ case is_unused (State ) of
346339 true -> ok ;
347340 false -> in_use
348341 end .
@@ -367,16 +360,8 @@ run_poke_burst(MessageBuffer, State) ->
367360 State # q {message_buffer = MessageBuffer }
368361 end .
369362
370- is_unused () ->
371- is_unused1 (get ()).
372-
373- is_unused1 ([]) ->
374- true ;
375- is_unused1 ([{{ch , _ }, # cr {consumers = Consumers }} | _Rest ])
376- when Consumers /= [] ->
377- false ;
378- is_unused1 ([_ | Rest ]) ->
379- is_unused1 (Rest ).
363+ is_unused (State ) -> queue :is_empty (State # q .round_robin ) andalso
364+ queue :is_empty (State # q .blocked_consumers ).
380365
381366maybe_send_reply (_ChPid , undefined ) -> ok ;
382367maybe_send_reply (ChPid , Msg ) -> ok = rabbit_channel :send_command (ChPid , Msg ).
@@ -535,9 +520,8 @@ i(messages, State) ->
535520i (acks_uncommitted , _ ) ->
536521 lists :sum ([length (Pending ) ||
537522 # tx {pending_acks = Pending } <- all_tx_record ()]);
538- i (consumers , _ ) ->
539- lists :sum ([length (Consumers ) ||
540- # cr {consumers = Consumers } <- all_ch_record ()]);
523+ i (consumers , State ) ->
524+ queue :len (State # q .round_robin ) + queue :len (State # q .blocked_consumers );
541525i (transactions , _ ) ->
542526 length (all_tx_record ());
543527i (memory , _ ) ->
@@ -619,22 +603,22 @@ handle_call({basic_get, ChPid, NoAck}, _From,
619603handle_call ({basic_consume , NoAck , ReaderPid , ChPid , LimiterPid ,
620604 ConsumerTag , ExclusiveConsume , OkMsg },
621605 _From , State = # q {owner = Owner ,
622- exclusive_consumer = ExistingHolder ,
623- round_robin = RoundRobin }) ->
606+ exclusive_consumer = ExistingHolder }) ->
624607 case check_queue_owner (Owner , ReaderPid ) of
625608 mismatch ->
626609 reply ({error , queue_owned_by_another_connection }, State );
627610 ok ->
628- case check_exclusive_access (ExistingHolder , ExclusiveConsume ) of
611+ case check_exclusive_access (ExistingHolder , ExclusiveConsume ,
612+ State ) of
629613 in_use ->
630614 reply ({error , exclusive_consume_unavailable }, State );
631615 ok ->
632- C = # cr {consumers = Consumers } = ch_record (ChPid ),
616+ C = # cr {consumer_count = ConsumerCount } = ch_record (ChPid ),
633617 Consumer = # consumer {tag = ConsumerTag ,
634618 ack_required = not (NoAck )},
635- store_ch_record (C # cr {consumers = [ Consumer | Consumers ] ,
619+ store_ch_record (C # cr {consumer_count = ConsumerCount + 1 ,
636620 limiter_pid = LimiterPid }),
637- if Consumers == [] ->
621+ if ConsumerCount == 0 ->
638622 ok = rabbit_limiter :register (LimiterPid , self ());
639623 true ->
640624 ok
@@ -648,46 +632,49 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
648632 ok = maybe_send_reply (ChPid , OkMsg ),
649633 State2 =
650634 case is_ch_blocked (C ) of
651- true -> State1 ;
635+ true -> State1 # q {
636+ blocked_consumers =
637+ add_consumer (
638+ ChPid , Consumer ,
639+ State1 # q .blocked_consumers )};
652640 false -> run_poke_burst (
653641 State1 # q {
654- round_robin = queue :in (
655- {ChPid , Consumer },
656- RoundRobin )})
642+ round_robin =
643+ add_consumer (
644+ ChPid , Consumer ,
645+ State1 # q .round_robin )})
657646 end ,
658647 reply (ok , State2 )
659648 end
660649 end ;
661650
662651handle_call ({basic_cancel , ChPid , ConsumerTag , OkMsg }, _From ,
663- State = # q {exclusive_consumer = Holder ,
664- round_robin = RoundRobin }) ->
652+ State = # q {exclusive_consumer = Holder }) ->
665653 case lookup_ch (ChPid ) of
666654 not_found ->
667655 ok = maybe_send_reply (ChPid , OkMsg ),
668656 reply (ok , State );
669- C = # cr {consumers = Consumers , limiter_pid = LimiterPid } ->
670- NewConsumers = lists :filter
671- (fun (# consumer {tag = CT }) -> CT /= ConsumerTag end ,
672- Consumers ),
673- store_ch_record (C # cr {consumers = NewConsumers }),
674- if NewConsumers == [] ->
657+ C = # cr {consumer_count = ConsumerCount , limiter_pid = LimiterPid } ->
658+ store_ch_record (C # cr {consumer_count = ConsumerCount - 1 }),
659+ if ConsumerCount == 1 ->
675660 ok = rabbit_limiter :unregister (LimiterPid , self ());
676661 true ->
677662 ok
678663 end ,
679664 ok = maybe_send_reply (ChPid , OkMsg ),
680- case check_auto_delete (
681- State # q {exclusive_consumer = cancel_holder (ChPid ,
682- ConsumerTag ,
683- Holder ),
684- round_robin = block_consumer (ChPid ,
685- ConsumerTag ,
686- RoundRobin )}) of
687- {continue , State1 } ->
688- reply (ok , State1 );
689- {stop , State1 } ->
690- {stop , normal , ok , State1 }
665+ NewState =
666+ State # q {exclusive_consumer = cancel_holder (ChPid ,
667+ ConsumerTag ,
668+ Holder ),
669+ round_robin = remove_consumer (
670+ ChPid , ConsumerTag ,
671+ State # q .round_robin ),
672+ blocked_consumers = remove_consumer (
673+ ChPid , ConsumerTag ,
674+ State # q .blocked_consumers )},
675+ case should_auto_delete (NewState ) of
676+ false -> reply (ok , NewState );
677+ true -> {stop , normal , ok , NewState }
691678 end
692679 end ;
693680
@@ -699,7 +686,7 @@ handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
699686handle_call ({delete , IfUnused , IfEmpty }, _From ,
700687 State = # q {message_buffer = MessageBuffer }) ->
701688 IsEmpty = queue :is_empty (MessageBuffer ),
702- IsUnused = is_unused (),
689+ IsUnused = is_unused (State ),
703690 if
704691 IfEmpty and not (IsEmpty ) ->
705692 reply ({error , not_empty }, State );
@@ -718,7 +705,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
718705 exclusive_consumer = Holder }) ->
719706 case Owner of
720707 none ->
721- case check_exclusive_access (Holder , true ) of
708+ case check_exclusive_access (Holder , true , State ) of
722709 in_use ->
723710 % % FIXME: Is this really the right answer? What if
724711 % % an active consumer's reader is actually the
@@ -794,10 +781,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
794781 noreply (
795782 possibly_unblock (
796783 State , ChPid ,
797- fun (C = # cr {consumers = Consumers ,
784+ fun (C = # cr {consumer_count = ConsumerCount ,
798785 limiter_pid = OldLimiterPid ,
799786 is_limit_active = Limited }) ->
800- if Consumers =/= [] andalso OldLimiterPid == undefined ->
787+ if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
801788 ok = rabbit_limiter :register (LimiterPid , self ());
802789 true ->
803790 ok
0 commit comments