@@ -514,7 +514,8 @@ apply(#{index := _Idx}, #garbage_collection{}, State) ->
514514 {State , ok , [{aux , garbage_collection }]};
515515apply (Meta , {timeout , expire_msgs }, State ) ->
516516 checkout (Meta , State , State , []);
517- apply (#{system_time := Ts } = Meta ,
517+ apply (#{machine_version := Vsn ,
518+ system_time := Ts } = Meta ,
518519 {down , Pid , noconnection },
519520 #? STATE {consumers = Cons0 ,
520521 cfg = # cfg {consumer_strategy = single_active },
@@ -524,7 +525,7 @@ apply(#{system_time := Ts} = Meta,
524525 % % if the pid refers to an active or cancelled consumer,
525526 % % mark it as suspected and return it to the waiting queue
526527 {State1 , Effects0 } =
527- maps :fold (
528+ rabbit_fifo_maps :fold (
528529 fun (CKey , ? CONSUMER_PID (P ) = C0 , {S0 , E0 })
529530 when node (P ) =:= Node ->
530531 % % the consumer should be returned to waiting
@@ -546,7 +547,7 @@ apply(#{system_time := Ts} = Meta,
546547 Effs1 };
547548 (_ , _ , S ) ->
548549 S
549- end , {State0 , []}, Cons0 ),
550+ end , {State0 , []}, Cons0 , Vsn ),
550551 WaitingConsumers = update_waiting_consumer_status (Node , State1 ,
551552 suspected_down ),
552553
@@ -561,7 +562,8 @@ apply(#{system_time := Ts} = Meta,
561562 end , Enqs0 ),
562563 Effects = [{monitor , node , Node } | Effects1 ],
563564 checkout (Meta , State0 , State #? STATE {enqueuers = Enqs }, Effects );
564- apply (#{system_time := Ts } = Meta ,
565+ apply (#{machine_version := Vsn ,
566+ system_time := Ts } = Meta ,
565567 {down , Pid , noconnection },
566568 #? STATE {consumers = Cons0 ,
567569 enqueuers = Enqs0 } = State0 ) ->
@@ -576,7 +578,7 @@ apply(#{system_time := Ts} = Meta,
576578 Node = node (Pid ),
577579
578580 {State , Effects1 } =
579- maps :fold (
581+ rabbit_fifo_maps :fold (
580582 fun (CKey , # consumer {cfg = # consumer_cfg {pid = P },
581583 status = up } = C0 ,
582584 {St0 , Eff }) when node (P ) =:= Node ->
@@ -587,7 +589,7 @@ apply(#{system_time := Ts} = Meta,
587589 {St , Eff1 };
588590 (_ , _ , {St , Eff }) ->
589591 {St , Eff }
590- end , {State0 , []}, Cons0 ),
592+ end , {State0 , []}, Cons0 , Vsn ),
591593 Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
592594 E # enqueuer {status = suspected_down };
593595 (_ , E ) -> E
@@ -603,15 +605,17 @@ apply(#{system_time := Ts} = Meta,
603605apply (Meta , {down , Pid , _Info }, State0 ) ->
604606 {State1 , Effects1 } = activate_next_consumer (handle_down (Meta , Pid , State0 )),
605607 checkout (Meta , State0 , State1 , Effects1 );
606- apply (Meta , {nodeup , Node }, #? STATE {consumers = Cons0 ,
607- enqueuers = Enqs0 ,
608- service_queue = _SQ0 } = State0 ) ->
608+ apply (#{machine_version := Vsn } = Meta ,
609+ {nodeup , Node },
610+ #? STATE {consumers = Cons0 ,
611+ enqueuers = Enqs0 ,
612+ service_queue = _SQ0 } = State0 ) ->
609613 % % A node we are monitoring has come back.
610614 % % If we have suspected any processes of being
611615 % % down we should now re-issue the monitors for them to detect if they're
612616 % % actually down or not
613617 Monitors = [{monitor , process , P }
614- || P <- suspected_pids_for (Node , State0 )],
618+ || P <- suspected_pids_for (Node , Vsn , State0 )],
615619
616620 Enqs1 = maps :map (fun (P , E ) when node (P ) =:= Node ->
617621 E # enqueuer {status = up };
@@ -620,17 +624,18 @@ apply(Meta, {nodeup, Node}, #?STATE{consumers = Cons0,
620624 ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
621625 % % mark all consumers as up
622626 {State1 , Effects1 } =
623- maps :fold (fun (ConsumerKey , ? CONSUMER_PID (P ) = C , {SAcc , EAcc })
624- when (node (P ) =:= Node ) and
625- (C # consumer .status =/= cancelled ) ->
626- EAcc1 = ConsumerUpdateActiveFun (SAcc , ConsumerKey ,
627- C , true , up , EAcc ),
628- {update_or_remove_con (Meta , ConsumerKey ,
629- C # consumer {status = up },
630- SAcc ), EAcc1 };
631- (_ , _ , Acc ) ->
632- Acc
633- end , {State0 , Monitors }, Cons0 ),
627+ rabbit_fifo_maps :fold (
628+ fun (ConsumerKey , ? CONSUMER_PID (P ) = C , {SAcc , EAcc })
629+ when (node (P ) =:= Node ) and
630+ (C # consumer .status =/= cancelled ) ->
631+ EAcc1 = ConsumerUpdateActiveFun (SAcc , ConsumerKey ,
632+ C , true , up , EAcc ),
633+ {update_or_remove_con (Meta , ConsumerKey ,
634+ C # consumer {status = up },
635+ SAcc ), EAcc1 };
636+ (_ , _ , Acc ) ->
637+ Acc
638+ end , {State0 , Monitors }, Cons0 , Vsn ),
634639 Waiting = update_waiting_consumer_status (Node , State1 , up ),
635640 State2 = State1 #? STATE {enqueuers = Enqs1 ,
636641 waiting_consumers = Waiting },
@@ -708,27 +713,29 @@ convert_v3_to_v4(#{} = _Meta, StateV3) ->
708713 msg_cache = rabbit_fifo_v3 :get_field (msg_cache , StateV3 ),
709714 unused_1 = []}.
710715
711- purge_node (Meta , Node , State , Effects ) ->
716+ purge_node (#{ machine_version : = Vsn } = Meta , Node , State , Effects ) ->
712717 lists :foldl (fun (Pid , {S0 , E0 }) ->
713718 {S , E } = handle_down (Meta , Pid , S0 ),
714719 {S , E0 ++ E }
715720 end , {State , Effects },
716- all_pids_for (Node , State )).
721+ all_pids_for (Node , Vsn , State )).
717722
718723% % any downs that are not noconnection
719- handle_down (Meta , Pid , #? STATE {consumers = Cons0 ,
720- enqueuers = Enqs0 } = State0 ) ->
724+ handle_down (#{machine_version := Vsn } = Meta ,
725+ Pid , #? STATE {consumers = Cons0 ,
726+ enqueuers = Enqs0 } = State0 ) ->
721727 % Remove any enqueuer for the down pid
722728 State1 = State0 #? STATE {enqueuers = maps :remove (Pid , Enqs0 )},
723729 {Effects1 , State2 } = handle_waiting_consumer_down (Pid , State1 ),
724730 % return checked out messages to main queue
725731 % Find the consumers for the down pid
726- DownConsumers = maps :keys (maps :filter (fun (_CKey , ? CONSUMER_PID (P )) ->
727- P =:= Pid
728- end , Cons0 )),
732+ DownConsumers = maps :filter (fun (_CKey , ? CONSUMER_PID (P )) ->
733+ P =:= Pid
734+ end , Cons0 ),
735+ DownConsumerKeys = rabbit_fifo_maps :keys (DownConsumers , Vsn ),
729736 lists :foldl (fun (ConsumerKey , {S , E }) ->
730737 cancel_consumer (Meta , ConsumerKey , S , E , down )
731- end , {State2 , Effects1 }, DownConsumers ).
738+ end , {State2 , Effects1 }, DownConsumerKeys ).
732739
733740consumer_active_flag_update_function (
734741 #? STATE {cfg = # cfg {consumer_strategy = competing }}) ->
@@ -916,14 +923,15 @@ get_checked_out(CKey, From, To, #?STATE{consumers = Consumers}) ->
916923 end .
917924
918925-spec version () -> pos_integer ().
919- version () -> 5 .
926+ version () -> 6 .
920927
921928which_module (0 ) -> rabbit_fifo_v0 ;
922929which_module (1 ) -> rabbit_fifo_v1 ;
923930which_module (2 ) -> rabbit_fifo_v3 ;
924931which_module (3 ) -> rabbit_fifo_v3 ;
925932which_module (4 ) -> ? MODULE ;
926- which_module (5 ) -> ? MODULE .
933+ which_module (5 ) -> ? MODULE ;
934+ which_module (6 ) -> ? MODULE .
927935
928936-define (AUX , aux_v3 ).
929937
@@ -2692,41 +2700,45 @@ all_nodes(#?STATE{consumers = Cons0,
26922700 Acc #{node (P ) => ok }
26932701 end , Nodes1 , WaitingConsumers0 )).
26942702
2695- all_pids_for (Node , #? STATE {consumers = Cons0 ,
2696- enqueuers = Enqs0 ,
2697- waiting_consumers = WaitingConsumers0 }) ->
2698- Cons = maps :fold (fun (_ , ? CONSUMER_PID (P ), Acc )
2699- when node (P ) =:= Node ->
2700- [P | Acc ];
2701- (_ , _ , Acc ) -> Acc
2702- end , [], Cons0 ),
2703- Enqs = maps :fold (fun (P , _ , Acc )
2704- when node (P ) =:= Node ->
2705- [P | Acc ];
2706- (_ , _ , Acc ) -> Acc
2707- end , Cons , Enqs0 ),
2703+ all_pids_for (Node , Vsn , #? STATE {consumers = Cons0 ,
2704+ enqueuers = Enqs0 ,
2705+ waiting_consumers = WaitingConsumers0 }) ->
2706+ Cons = rabbit_fifo_maps :fold (fun (_ , ? CONSUMER_PID (P ), Acc )
2707+ when node (P ) =:= Node ->
2708+ [P | Acc ];
2709+ (_ , _ , Acc ) ->
2710+ Acc
2711+ end , [], Cons0 , Vsn ),
2712+ Enqs = rabbit_fifo_maps :fold (fun (P , _ , Acc )
2713+ when node (P ) =:= Node ->
2714+ [P | Acc ];
2715+ (_ , _ , Acc ) ->
2716+ Acc
2717+ end , Cons , Enqs0 , Vsn ),
27082718 lists :foldl (fun ({_ , ? CONSUMER_PID (P )}, Acc )
27092719 when node (P ) =:= Node ->
27102720 [P | Acc ];
27112721 (_ , Acc ) -> Acc
27122722 end , Enqs , WaitingConsumers0 ).
27132723
2714- suspected_pids_for (Node , #? STATE {consumers = Cons0 ,
2715- enqueuers = Enqs0 ,
2716- waiting_consumers = WaitingConsumers0 }) ->
2717- Cons = maps :fold (fun (_Key ,
2718- # consumer {cfg = # consumer_cfg {pid = P },
2719- status = suspected_down },
2720- Acc )
2721- when node (P ) =:= Node ->
2722- [P | Acc ];
2723- (_ , _ , Acc ) -> Acc
2724- end , [], Cons0 ),
2725- Enqs = maps :fold (fun (P , # enqueuer {status = suspected_down }, Acc )
2726- when node (P ) =:= Node ->
2727- [P | Acc ];
2728- (_ , _ , Acc ) -> Acc
2729- end , Cons , Enqs0 ),
2724+ suspected_pids_for (Node , Vsn , #? STATE {consumers = Cons0 ,
2725+ enqueuers = Enqs0 ,
2726+ waiting_consumers = WaitingConsumers0 }) ->
2727+ Cons = rabbit_fifo_maps :fold (fun (_Key ,
2728+ # consumer {cfg = # consumer_cfg {pid = P },
2729+ status = suspected_down },
2730+ Acc )
2731+ when node (P ) =:= Node ->
2732+ [P | Acc ];
2733+ (_ , _ , Acc ) ->
2734+ Acc
2735+ end , [], Cons0 , Vsn ),
2736+ Enqs = rabbit_fifo_maps :fold (fun (P , # enqueuer {status = suspected_down }, Acc )
2737+ when node (P ) =:= Node ->
2738+ [P | Acc ];
2739+ (_ , _ , Acc ) ->
2740+ Acc
2741+ end , Cons , Enqs0 , Vsn ),
27302742 lists :foldl (fun ({_Key ,
27312743 # consumer {cfg = # consumer_cfg {pid = P },
27322744 status = suspected_down }}, Acc )
@@ -2783,7 +2795,10 @@ convert(Meta, 3, To, State) ->
27832795 convert (Meta , 4 , To , convert_v3_to_v4 (Meta , State ));
27842796convert (Meta , 4 , To , State ) ->
27852797 % % no conversion needed, this version only includes a logic change
2786- convert (Meta , 5 , To , State ).
2798+ convert (Meta , 5 , To , State );
2799+ convert (Meta , 5 , To , State ) ->
2800+ % % no conversion needed, this version only includes a logic change
2801+ convert (Meta , 6 , To , State ).
27872802
27882803smallest_raft_index (#? STATE {messages = Messages ,
27892804 ra_indexes = Indexes ,
0 commit comments