1111-compile (inline_list_funcs ).
1212-compile (inline ).
1313-compile ({no_auto_import , [apply / 3 ]}).
14+ -dialyzer ({nowarn_function , convert_v7_to_v8 / 2 }).
1415-dialyzer (no_improper_lists ).
1516
1617-include (" rabbit_fifo.hrl" ).
@@ -483,95 +484,146 @@ apply_(#{index := _Idx}, #garbage_collection{}, State) ->
483484 {State , ok , [{aux , garbage_collection }]};
484485apply_ (Meta , {timeout , expire_msgs }, State ) ->
485486 checkout (Meta , State , State , []);
486- apply_ (#{system_time := Ts } = Meta ,
487- {down , Pid , noconnection },
488- #? STATE {consumers = Cons0 ,
489- cfg = # cfg {consumer_strategy = single_active },
490- waiting_consumers = Waiting0 ,
491- enqueuers = Enqs0 } = State0 ) ->
492- Node = node (Pid ),
493- % % if the pid refers to an active or cancelled consumer,
494- % % mark it as suspected and return it to the waiting queue
495- {State1 , Effects0 } =
496- maps :fold (
497- fun (CKey , ? CONSUMER_PID (P ) = C0 , {S0 , E0 })
498- when node (P ) =:= Node ->
499- % % the consumer should be returned to waiting
500- % % and checked out messages should be returned
501- Effs = consumer_update_active_effects (
502- S0 , C0 , false , suspected_down , E0 ),
503- % % TODO: set a timer instead of reaturn all here to allow
504- % % a disconnected node a configurable bit of time to be
505- % % reconnected
506- {St , Effs1 } = return_all (Meta , S0 , Effs , CKey , C0 , false ),
507- % % if the consumer was cancelled there is a chance it got
508- % % removed when returning hence we need to be defensive here
509- Waiting = case St #? STATE .consumers of
510- #{CKey := C } ->
511- Waiting0 ++ [{CKey , C }];
512- _ ->
513- Waiting0
514- end ,
515- {St #? STATE {consumers = maps :remove (CKey , St #? STATE .consumers ),
516- waiting_consumers = Waiting ,
517- last_active = Ts },
518- Effs1 };
519- (_ , _ , S ) ->
520- S
521- end , {State0 , []}, maps :iterator (Cons0 , ordered )),
522- WaitingConsumers = update_waiting_consumer_status (Node , State1 ,
523- suspected_down ),
524-
525- % % select a new consumer from the waiting queue and run a checkout
526- State2 = State1 #? STATE {waiting_consumers = WaitingConsumers },
527- {State , Effects1 } = activate_next_consumer (State2 , Effects0 ),
528-
529- % % mark any enquers as suspected
530- Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
531- E # enqueuer {status = suspected_down };
532- (_ , E ) -> E
533- end , Enqs0 ),
534- Effects = [{monitor , node , Node } | Effects1 ],
535- checkout (Meta , State0 , State #? STATE {enqueuers = Enqs }, Effects );
487+ % apply_(#{system_time := Ts} = Meta,
488+ % {down, Pid, noconnection},
489+ % #?STATE{consumers = Cons0,
490+ % cfg = #cfg{consumer_strategy = single_active},
491+ % waiting_consumers = Waiting0,
492+ % enqueuers = Enqs0} = State0) ->
493+ % Node = node(Pid),
494+ % %% if the pid refers to an active or cancelled consumer,
495+ % %% mark it as suspected and return it to the waiting queue
496+ % {State1, Effects0} =
497+ % maps:fold(
498+ % fun(CKey, ?CONSUMER_PID(P) = #consumer{status = Status} = C0, {S0, E0})
499+ % when is_atom(Status) andalso node(P) =:= Node ->
500+ % %% the consumer should be returned to waiting
501+ % %% and checked out messages should be returned
502+ % Effs = consumer_update_active_effects(
503+ % S0, C0, false, { suspected_down, Status} , E0),
504+ % %% TODO: set a timer instead of reaturn all here to allow
505+ % %% a disconnected node a configurable bit of time to be
506+ % %% reconnected
507+ % {St, Effs1} = return_all(Meta, S0, Effs, CKey, C0, false),
508+ % %% if the consumer was cancelled there is a chance it got
509+ % %% removed when returning hence we need to be defensive here
510+ % Waiting = case St#?STATE.consumers of
511+ % #{CKey := C} ->
512+ % Waiting0 ++ [{CKey, C}];
513+ % _ ->
514+ % Waiting0
515+ % end,
516+ % {St#?STATE{consumers = maps:remove(CKey, St#?STATE.consumers),
517+ % waiting_consumers = Waiting,
518+ % last_active = Ts},
519+ % Effs1};
520+ % (_, _, S) ->
521+ % S
522+ % end, {State0, []}, maps:iterator(Cons0, ordered)),
523+ % WaitingConsumers = update_waiting_consumer_status(Node, State1,
524+ % { suspected_down, up} ),
525+
526+ % %% select a new consumer from the waiting queue and run a checkout
527+ % State2 = State1#?STATE{waiting_consumers = WaitingConsumers},
528+ % {State, Effects1} = activate_next_consumer(State2, Effects0),
529+
530+ % %% mark any enquers as suspected
531+ % Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
532+ % E#enqueuer{status = suspected_down};
533+ % (_, E) -> E
534+ % end, Enqs0),
535+ % Effects = [{monitor, node, Node} | Effects1],
536+ % checkout(Meta, State0, State#?STATE{enqueuers = Enqs}, Effects);
536537apply_ (#{system_time := Ts } = Meta ,
537538 {down , Pid , noconnection },
538539 #? STATE {consumers = Cons0 ,
539540 enqueuers = Enqs0 } = State0 ) ->
541+
540542 % % A node has been disconnected. This doesn't necessarily mean that
541543 % % any processes on this node are down, they _may_ come back so here
542544 % % we just mark them as suspected (effectively deactivated)
543545 % % and return all checked out messages to the main queue for delivery to any
544546 % % live consumers
545- % %
546- % % all pids for the disconnected node will be marked as suspected not just
547- % % the one we got the `down' command for
547+
548548 Node = node (Pid ),
549549
550- {State , Effects1 } =
550+ {Cons , Effects1 } =
551551 maps :fold (
552552 fun (CKey , # consumer {cfg = # consumer_cfg {pid = P },
553- status = up } = C0 ,
554- {St0 , Eff }) when node (P ) =:= Node ->
555- C = C0 # consumer {status = suspected_down },
556- {St , Eff0 } = return_all (Meta , St0 , Eff , CKey , C , false ),
557- Eff1 = consumer_update_active_effects (St , C , false ,
553+ status = Status } = C0 ,
554+ {Cns0 , Eff }) when P =:= Pid ->
555+ TargetStatus = case Status of
556+ {suspected_down , T } -> T ;
557+ _ ->
558+ Status
559+ end ,
560+ C = C0 # consumer {status = {suspected_down , TargetStatus }},
561+ % down consumer still has messages assigned
562+ % TODO: make timeout configurable
563+ Eff0 = [{timer , {consumer_down_timeout , CKey }, 10_000 } | Eff ],
564+ Eff1 = consumer_update_active_effects (State0 , C , false ,
558565 suspected_down , Eff0 ),
559- {St , Eff1 };
560- (_ , _ , {St , Eff }) ->
561- {St , Eff }
562- end , {State0 , []}, maps :iterator (Cons0 , ordered )),
563- Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
564- E # enqueuer {status = suspected_down };
565- (_ , E ) -> E
566- end , Enqs0 ),
566+ {Cns0 #{CKey => C }, Eff1 };
567+ (_ , _ , St ) ->
568+ St
569+ end , {Cons0 , []}, maps :iterator (Cons0 , ordered )),
570+ Enqs = case Enqs0 of
571+ #{Pid := E } ->
572+ Enqs0 #{Pid := E # enqueuer {status = suspected_down }};
573+ _ ->
574+ Enqs0
575+ end ,
567576
577+ WaitingConsumers = update_waiting_consumer_status (Pid , State0 ,
578+ {suspected_down , up }),
568579 % Monitor the node so that we can "unsuspect" these processes when the node
569580 % comes back, then re-issue all monitors and discover the final fate of
570581 % these processes
571-
572582 Effects = [{monitor , node , Node } | Effects1 ],
573- checkout (Meta , State0 , State #? STATE {enqueuers = Enqs ,
574- last_active = Ts }, Effects );
583+ checkout (Meta , State0 , State0 #? STATE {enqueuers = Enqs ,
584+ waiting_consumers = WaitingConsumers ,
585+ consumers = Cons ,
586+ last_active = Ts }, Effects );
587+ apply_ (Meta , {timeout , {consumer_down_timeout , CKey }},
588+ #? STATE {cfg = # cfg {consumer_strategy = competing },
589+ consumers = Consumers } = State0 ) ->
590+
591+ case find_consumer (CKey , Consumers ) of
592+ {_CKey , # consumer {status = {suspected_down , _ }} = Consumer } ->
593+ % % the consumer is still suspected and has timed out
594+ % % return all messages
595+ {State1 , Effects0 } = return_all (Meta , State0 , [], CKey ,
596+ Consumer , false ),
597+ checkout (Meta , State0 , State1 , Effects0 );
598+ _ ->
599+ {State0 , []}
600+ end ;
601+ apply_ (#{system_time := Ts } = Meta , {timeout , {consumer_down_timeout , CKey }},
602+ #? STATE {cfg = # cfg {consumer_strategy = single_active },
603+ waiting_consumers = Waiting0 ,
604+ consumers = Consumers } = State0 ) ->
605+
606+ case find_consumer (CKey , Consumers ) of
607+ {_CKey , # consumer {status = {suspected_down , Status }} = Consumer } ->
608+ % % the consumer is still suspected and has timed out
609+ % % return all messages
610+ {State1 , Effects0 } = return_all (Meta , State0 , [], CKey ,
611+ Consumer , false ),
612+ Waiting = case State1 #? STATE .consumers of
613+ #{CKey := C } when Status =/= cancelled ->
614+ Waiting0 ++
615+ [{CKey , C # consumer {status = {suspected_down , up }}}];
616+ _ ->
617+ Waiting0
618+ end ,
619+ State2 = State1 #? STATE {consumers = maps :remove (CKey , State1 #? STATE .consumers ),
620+ waiting_consumers = Waiting ,
621+ last_active = Ts },
622+ {State , Effects1 } = activate_next_consumer (State2 , Effects0 ),
623+ checkout (Meta , State0 , State , Effects1 );
624+ _ ->
625+ {State0 , []}
626+ end ;
575627apply_ (Meta , {down , Pid , _Info }, State0 ) ->
576628 {State1 , Effects1 } = activate_next_consumer (handle_down (Meta , Pid , State0 )),
577629 checkout (Meta , State0 , State1 , Effects1 );
@@ -581,33 +633,37 @@ apply_(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
581633 % % If we have suspected any processes of being
582634 % % down we should now re-issue the monitors for them to detect if they're
583635 % % actually down or not
584- Monitors = [{monitor , process , P }
585- || P <- suspected_pids_for (Node , State0 )],
636+ % % send leader change events to all disconnected enqueuers to prompt them
637+ % % to resend any messages stuck during disconnection,
638+ % % ofc it may not be a leader change per se but it has the same effect
639+ Effects0 = lists :flatten ([[{monitor , process , P },
640+ {send_msg , P , leader_change , ra_event }]
641+ || P <- suspected_pids_for (Node , State0 )]),
586642
587643 Enqs1 = maps :map (fun (P , E ) when node (P ) =:= Node ->
588644 E # enqueuer {status = up };
589645 (_ , E ) -> E
590646 end , Enqs0 ),
591- % % send leader change events to all disconnected enqueuers to prompt them
592- % % to resend any messages stuck during disconnection,
593- % % ofc it may not be a leader change per se
594- Effects0 = maps :fold (fun (P , _E , Acc ) when node (P ) =:= Node ->
595- [{send_msg , P , leader_change , ra_event } | Acc ];
596- (_ , _E , Acc ) -> Acc
597- end , Monitors , Enqs0 ),
598647
599648 ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
600649 % % mark all consumers as up
601650 {State1 , Effects1 } =
602651 maps :fold (
603- fun (ConsumerKey , ? CONSUMER_PID (P ) = C , {SAcc , EAcc })
604- when (node (P ) =:= Node ) and
605- (C # consumer .status =/= cancelled ) ->
652+ fun (ConsumerKey ,
653+ ? CONSUMER_PID (P ) =
654+ # consumer {status = {suspected_down , NextStatus }} = C ,
655+ {SAcc , EAcc0 })
656+ when node (P ) =:= Node ->
606657 EAcc1 = ConsumerUpdateActiveFun (SAcc , ConsumerKey ,
607- C , true , up , EAcc ),
658+ C , true , NextStatus , EAcc0 ),
659+ % % cancel timers
660+ EAcc = [{timer ,
661+ {consumer_down_timeout , ConsumerKey },
662+ infinity } | EAcc1 ],
663+
608664 {update_or_remove_con (Meta , ConsumerKey ,
609- C # consumer {status = up },
610- SAcc ), EAcc1 };
665+ C # consumer {status = NextStatus },
666+ SAcc ), EAcc };
611667 (_ , _ , Acc ) ->
612668 Acc
613669 end , {State0 , Effects0 }, maps :iterator (Cons0 , ordered )),
@@ -697,8 +753,16 @@ snapshot_installed(_Meta, #?MODULE{cfg = #cfg{},
697753 Effs .
698754
699755convert_v7_to_v8 (#{} = _Meta , StateV7 ) ->
756+ % % the structure is intact for now
757+ Cons0 = element (#? STATE .consumers , StateV7 ),
758+ Cons = maps :map (fun (_CKey , # consumer {status = suspected_down } = C ) ->
759+ C # consumer {status = {suspected_down , up }};
760+ (_CKey , C ) ->
761+ C
762+ end , Cons0 ),
700763 StateV8 = StateV7 ,
701764 StateV8 #? STATE {discarded_bytes = 0 ,
765+ consumers = Cons ,
702766 unused_0 = ? NIL }.
703767
704768purge_node (Meta , Node , State , Effects ) ->
@@ -764,15 +828,16 @@ handle_waiting_consumer_down(Pid,
764828 State = State0 #? STATE {waiting_consumers = StillUp },
765829 {Effects , State }.
766830
767- update_waiting_consumer_status (Node ,
831+ update_waiting_consumer_status (DownPidOrNode ,
768832 #? STATE {waiting_consumers = WaitingConsumers },
769833 Status ) ->
770834 sort_waiting (
771- [case node (Pid ) of
772- Node ->
773- {ConsumerKey , Consumer # consumer {status = Status }};
774- _ ->
775- {ConsumerKey , Consumer }
835+ [if is_pid (DownPidOrNode ) andalso DownPidOrNode == Pid ->
836+ {ConsumerKey , Consumer # consumer {status = Status }};
837+ is_atom (DownPidOrNode ) andalso DownPidOrNode == node (Pid ) ->
838+ {ConsumerKey , Consumer # consumer {status = Status }};
839+ true ->
840+ {ConsumerKey , Consumer }
776841 end || {ConsumerKey , ? CONSUMER_PID (Pid ) = Consumer }
777842 <- WaitingConsumers , Consumer # consumer .status =/= cancelled ]).
778843
@@ -1222,7 +1287,9 @@ query_waiting_consumers(#?STATE{waiting_consumers = WaitingConsumers}) ->
12221287query_consumer_count (#? STATE {consumers = Consumers ,
12231288 waiting_consumers = WaitingConsumers }) ->
12241289 Up = maps :filter (fun (_ConsumerKey , # consumer {status = Status }) ->
1225- Status =/= suspected_down
1290+ % % TODO: should this really not include suspected
1291+ % % consumers?
1292+ is_atom (Status )
12261293 end , Consumers ),
12271294 maps :size (Up ) + length (WaitingConsumers ).
12281295
@@ -1235,8 +1302,8 @@ query_consumers(#?STATE{consumers = Consumers,
12351302 competing ->
12361303 fun (_ConsumerKey , # consumer {status = Status }) ->
12371304 case Status of
1238- suspected_down ->
1239- {false , Status };
1305+ { suspected_down , _ } ->
1306+ {false , suspected_down };
12401307 _ ->
12411308 {true , Status }
12421309 end
@@ -1523,7 +1590,10 @@ activate_next_consumer(#?STATE{consumers = Cons0,
15231590 end .
15241591
15251592active_consumer ({CKey , # consumer {status = Status } = Consumer , _I })
1526- when Status == up orelse Status == quiescing ->
1593+ when Status == up orelse
1594+ Status == quiescing orelse
1595+ Status == {suspected_down , up } orelse
1596+ Status == {suspected_down , quiescing } ->
15271597 {CKey , Consumer };
15281598active_consumer ({_CKey , # consumer {status = _ }, I }) ->
15291599 active_consumer (maps :next (I ));
@@ -1930,7 +2000,7 @@ return_one(Meta, MsgId, Msg0, DeliveryFailed, Anns,
19302000 State1 = State0 #? STATE {dlx = DlxState ,
19312001 discarded_bytes = DiscardedBytes0 - RetainedBytes },
19322002 {State , Effects } = complete (Meta , ConsumerKey , [MsgId ],
1933- Con0 , State1 , Effects0 ),
2003+ Con0 , State1 , Effects0 ),
19342004 {State , DlxEffects ++ Effects };
19352005 _ ->
19362006 Checked = maps :remove (MsgId , Checked0 ),
@@ -2711,7 +2781,7 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
27112781 waiting_consumers = WaitingConsumers0 }) ->
27122782 Cons = maps :fold (fun (_Key ,
27132783 # consumer {cfg = # consumer_cfg {pid = P },
2714- status = suspected_down },
2784+ status = { suspected_down , _ } },
27152785 Acc )
27162786 when node (P ) =:= Node ->
27172787 [P | Acc ];
@@ -2726,7 +2796,7 @@ suspected_pids_for(Node, #?STATE{consumers = Cons0,
27262796 end , Cons , maps :iterator (Enqs0 , ordered )),
27272797 lists :foldl (fun ({_Key ,
27282798 # consumer {cfg = # consumer_cfg {pid = P },
2729- status = suspected_down }}, Acc )
2799+ status = { suspected_down , _ } }}, Acc )
27302800 when node (P ) =:= Node ->
27312801 [P | Acc ];
27322802 (_ , Acc ) -> Acc
@@ -2737,7 +2807,7 @@ is_expired(Ts, #?STATE{cfg = #cfg{expires = Expires},
27372807 consumers = Consumers })
27382808 when is_number (LastActive ) andalso is_number (Expires ) ->
27392809 % % TODO: should it be active consumers?
2740- Active = maps :filter (fun (_ , # consumer {status = suspected_down }) ->
2810+ Active = maps :filter (fun (_ , # consumer {status = { suspected_down , _ } }) ->
27412811 false ;
27422812 (_ , _ ) ->
27432813 true
0 commit comments