1313
1414-define (STATE , ? MODULE ).
1515-record (? STATE , {
16- % % the current master pid
17- pid :: undefined | pid (),
16+ pid :: pid (),
1817 unconfirmed = #{} :: #{non_neg_integer () => # msg_status {}},
1918 monitored = #{} :: #{pid () => ok }
2019 }).
5049 format /2 ,
5150 init /1 ,
5251 close /1 ,
53- update /2 ,
5452 consume /3 ,
5553 cancel /3 ,
5654 handle_event /3 ,
@@ -295,16 +293,6 @@ init(Q) when ?amqqueue_is_classic(Q) ->
295293close (_State ) ->
296294 ok .
297295
298- -spec update (amqqueue :amqqueue () | amqqueue :target (), state ()) -> state ().
299- update (Q , #? STATE {pid = Pid } = State ) ->
300- case amqqueue :get_pid (Q ) of
301- Pid ->
302- State ;
303- NewPid ->
304- % % master pid is different, update
305- State #? STATE {pid = NewPid }
306- end .
307-
308296consume (Q , Spec , State0 ) when ? amqqueue_is_classic (Q ) ->
309297 QPid = amqqueue :get_pid (Q ),
310298 QRef = amqqueue :get_name (Q ),
@@ -326,9 +314,8 @@ consume(Q, Spec, State0) when ?amqqueue_is_classic(Q) ->
326314 ExclusiveConsume , Args , OkMsg , ActingUser },
327315 infinity ]}) of
328316 ok ->
329- % % TODO: track pids as they change
330- State = ensure_monitor (QPid , QRef , State0 ),
331- {ok , State #? STATE {pid = QPid }};
317+ State = State0 #? STATE {pid = QPid },
318+ {ok , ensure_monitor (QRef , State )};
332319 {error , exclusive_consume_unavailable } ->
333320 {error , access_refused , " ~ts in exclusive use" ,
334321 [rabbit_misc :rs (QRef )]};
@@ -364,10 +351,12 @@ cancel(Q, Spec, State) ->
364351 OkMsg = maps :get (ok_msg , Spec , undefined ),
365352 {basic_cancel , self (), ConsumerTag , OkMsg , ActingUser }
366353 end ,
367- case delegate :invoke (amqqueue :get_pid (Q ),
368- {gen_server2 , call , [Request , infinity ]}) of
369- ok -> {ok , State };
370- Err -> Err
354+ Pid = amqqueue :get_pid (Q ),
355+ case delegate :invoke (Pid , {gen_server2 , call , [Request , infinity ]}) of
356+ ok ->
357+ {ok , State #? STATE {pid = Pid }};
358+ Err ->
359+ Err
371360 end .
372361
373362-spec settle (rabbit_amqqueue :name (), rabbit_queue_type :settle_op (),
@@ -502,19 +491,21 @@ deliver(Qs0, Msg0, Options) ->
502491 delegate :invoke_no_result (MPids , {gen_server2 , cast , [MMsg ]}),
503492 {Qs , []}.
504493
505- -spec dequeue (rabbit_amqqueue : name (), NoAck :: boolean (),
494+ -spec dequeue (amqqueue : amqqueue (), NoAck :: boolean (),
506495 LimiterPid :: pid (), rabbit_types :ctag (), state ()) ->
507496 {ok , Count :: non_neg_integer (), rabbit_amqqueue :qmsg (), state ()} |
508497 {empty , state ()}.
509- dequeue (QName , NoAck , LimiterPid , _CTag , State0 ) ->
510- QPid = State0 #? STATE .pid ,
511- State1 = ensure_monitor (QPid , QName , State0 ),
498+ dequeue (Q , NoAck , LimiterPid , _CTag , State0 ) ->
499+ QName = amqqueue :get_name (Q ),
500+ QPid = amqqueue :get_pid (Q ),
501+ State1 = State0 #? STATE {pid = QPid },
502+ State = ensure_monitor (QName , State1 ),
512503 case delegate :invoke (QPid , {gen_server2 , call ,
513504 [{basic_get , self (), NoAck , LimiterPid }, infinity ]}) of
514505 empty ->
515- {empty , State1 };
506+ {empty , State };
516507 {ok , Count , Msg } ->
517- {ok , Count , Msg , State1 }
508+ {ok , Count , Msg , State }
518509 end .
519510
520511-spec state_info (state ()) -> #{atom () := term ()}.
@@ -603,9 +594,10 @@ qpids(Qs, Confirm, MsgNo) ->
603594 fun ({Q , S0 }, {MPidAcc , Qs0 }) ->
604595 QPid = amqqueue :get_pid (Q ),
605596 QRef = amqqueue :get_name (Q ),
606- S1 = ensure_monitor (QPid , QRef , S0 ),
607597 % % confirm record only if necessary
608- S = case S1 of
598+ S = case S0 of
599+ stateless ->
600+ S0 ;
609601 #? STATE {unconfirmed = U0 } ->
610602 Rec = [QPid ],
611603 U = case Confirm of
@@ -614,10 +606,9 @@ qpids(Qs, Confirm, MsgNo) ->
614606 true ->
615607 U0 #{MsgNo => # msg_status {pending = Rec }}
616608 end ,
617- S1 #? STATE {pid = QPid ,
618- unconfirmed = U };
619- stateless ->
620- S1
609+ S1 = S0 #? STATE {pid = QPid ,
610+ unconfirmed = U },
611+ ensure_monitor (QRef , S1 )
621612 end ,
622613 {[QPid | MPidAcc ], [{Q , S } | Qs0 ]}
623614 end , {[], []}, Qs ).
@@ -728,14 +719,15 @@ update_msg_status(confirm, Pid, #msg_status{pending = P,
728719update_msg_status (down , Pid , # msg_status {pending = P } = S ) ->
729720 S # msg_status {pending = lists :delete (Pid , P )}.
730721
731- ensure_monitor (_ , _ , State = stateless ) ->
732- State ;
733- ensure_monitor (Pid , _ , State = #? STATE {monitored = Monitored })
734- when is_map_key (Pid , Monitored ) ->
735- State ;
736- ensure_monitor (Pid , QName , State = #? STATE {monitored = Monitored }) ->
737- _ = erlang :monitor (process , Pid , [{tag , {'DOWN' , QName }}]),
738- State #? STATE {monitored = Monitored #{Pid => ok }}.
722+ ensure_monitor (QName , #? STATE {pid = Pid ,
723+ monitored = Monitored } = State ) ->
724+ case is_map_key (Pid , Monitored ) of
725+ true ->
726+ State ;
727+ false ->
728+ _Ref = erlang :monitor (process , Pid , [{tag , {'DOWN' , QName }}]),
729+ State #? STATE {monitored = Monitored #{Pid => ok }}
730+ end .
739731
740732% % part of channel <-> queue api
741733confirm_to_sender (Pid , QName , MsgSeqNos ) ->
0 commit comments