3939 handle_connection_down /2 ,
4040 consumer_groups /3 ,
4141 group_consumers /5 ,
42- overview /1 ]).
42+ overview /1 ,
43+ import_state /2 ]).
4344
4445-import (rabbit_stream_coordinator , [ra_local_query /1 ]).
4546
47+ -define (ACTIVE , active ).
48+ -define (WAITING , waiting ).
49+ -define (DEACTIVATING , deactivating ).
50+
4651% % Single Active Consumer API
4752-spec register_consumer (binary (),
4853 binary (),
@@ -231,7 +236,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
231236 G1 = remove_from_group (Consumer , Group0 ),
232237 handle_consumer_removal (
233238 G1 , Stream , ConsumerName ,
234- is_active (Consumer # consumer .active ));
239+ is_active (Consumer # consumer .status ));
235240 false ->
236241 {Group0 , []}
237242 end ,
@@ -255,10 +260,10 @@ apply(#command_activate_consumer{vhost = VirtualHost,
255260 [{VirtualHost , Stream , ConsumerName }]),
256261 {undefined , []};
257262 Group0 ->
258- Group1 = update_consumers (Group0 , waiting ),
263+ Group1 = update_consumers (Group0 , ? WAITING ),
259264 # consumer {pid = Pid , subscription_id = SubId } =
260265 evaluate_active_consumer (Group1 ),
261- Group2 = update_consumer_state_in_group (Group1 , Pid , SubId , active ),
266+ Group2 = update_consumer_state_in_group (Group1 , Pid , SubId , ? ACTIVE ),
262267 {Group2 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
263268 end ,
264269 StreamGroups1 =
@@ -314,7 +319,7 @@ group_consumers(VirtualHost,
314319 #{GroupId := # group {consumers = Consumers }} ->
315320 Cs = lists :foldr (fun (# consumer {subscription_id = SubId ,
316321 owner = Owner ,
317- active = Active },
322+ status = Status },
318323 Acc ) ->
319324 Record =
320325 lists :foldr (fun (subscription_id , RecAcc ) ->
@@ -326,7 +331,7 @@ group_consumers(VirtualHost,
326331 Owner }
327332 | RecAcc ];
328333 (state , RecAcc ) ->
329- [{state , Active }
334+ [{state , cli_consumer_status_label ( Status ) }
330335 | RecAcc ];
331336 (Unknown , RecAcc ) ->
332337 [{Unknown ,
@@ -342,6 +347,11 @@ group_consumers(VirtualHost,
342347 {error , not_found }
343348 end .
344349
350+ cli_consumer_status_label (? ACTIVE ) ->
351+ active ;
352+ cli_consumer_status_label (_ ) ->
353+ inactive .
354+
345355-spec ensure_monitors (command (),
346356 state (),
347357 map (),
@@ -432,7 +442,7 @@ handle_group_after_connection_down(Pid,
432442 % % keep flags to know what happened
433443 {Consumers1 , ActiveRemoved , AnyRemoved } =
434444 lists :foldl (
435- fun (# consumer {pid = P , active = S }, {L , ActiveFlag , _ })
445+ fun (# consumer {pid = P , status = S }, {L , ActiveFlag , _ })
436446 when P == Pid ->
437447 {L , is_active (S ) or ActiveFlag , true };
438448 (C , {L , ActiveFlag , AnyFlag }) ->
@@ -454,6 +464,42 @@ handle_group_after_connection_down(Pid,
454464 end
455465 end .
456466
467+ -spec import_state (ra_machine :version (), map ()) -> state ().
468+ import_state (4 , #{<<" groups" >> := Groups , <<" pids_groups" >> := PidsGroups }) ->
469+ #? MODULE {groups = map_to_groups (Groups ),
470+ pids_groups = map_to_pids_groups (PidsGroups )}.
471+
472+ map_to_groups (Groups ) when is_map (Groups ) ->
473+ maps :fold (fun (K , V , Acc ) ->
474+ Acc #{K => map_to_group (V )}
475+ end , #{}, Groups );
476+ map_to_groups (_ ) ->
477+ #{}.
478+
479+ map_to_pids_groups (PidsGroups ) when is_map (PidsGroups ) ->
480+ PidsGroups ;
481+ map_to_pids_groups (_ ) ->
482+ #{}.
483+
484+ map_to_group (#{<<" consumers" >> := Consumers , <<" partition_index" >> := Index }) ->
485+ C = lists :foldl (fun (V , Acc ) ->
486+ Acc ++ [map_to_consumer (V )]
487+ end , [], Consumers ),
488+ # group {consumers = C ,
489+ partition_index = Index }.
490+
491+ map_to_consumer (#{<<" pid" >> := Pid , <<" subscription_id" >> := SubId ,
492+ <<" owner" >> := Owner , <<" active" >> := Active }) ->
493+ # consumer {pid = Pid ,
494+ subscription_id = SubId ,
495+ owner = Owner ,
496+ status = active_to_status (Active )}.
497+
498+ active_to_status (true ) ->
499+ ? ACTIVE ;
500+ active_to_status (false ) ->
501+ ? WAITING .
502+
457503is_active (waiting ) ->
458504 false ;
459505is_active (_ ) ->
@@ -476,12 +522,12 @@ do_register_consumer(VirtualHost,
476522 # consumer {pid = ConnectionPid ,
477523 owner = Owner ,
478524 subscription_id = SubscriptionId ,
479- active = waiting };
525+ status = ? WAITING };
480526 false ->
481527 # consumer {pid = ConnectionPid ,
482528 subscription_id = SubscriptionId ,
483529 owner = Owner ,
484- active = active }
530+ status = ? ACTIVE }
485531 end ,
486532 Group1 = add_to_group (Consumer , Group0 ),
487533 StreamGroups1 =
@@ -491,17 +537,17 @@ do_register_consumer(VirtualHost,
491537 Group1 ,
492538 StreamGroups0 ),
493539
494- # consumer {active = Active } = Consumer ,
540+ # consumer {status = Status } = Consumer ,
495541 Effects =
496- case Active of
497- active ->
542+ case Status of
543+ ? ACTIVE ->
498544 [notify_consumer_effect (ConnectionPid , SubscriptionId ,
499- Stream , ConsumerName , is_active (Active ))];
545+ Stream , ConsumerName , is_active (Status ))];
500546 _ ->
501547 []
502548 end ,
503549
504- {State #? MODULE {groups = StreamGroups1 }, {ok , is_active (Active )}, Effects };
550+ {State #? MODULE {groups = StreamGroups1 }, {ok , is_active (Status )}, Effects };
505551do_register_consumer (VirtualHost ,
506552 Stream ,
507553 _PartitionIndex ,
@@ -521,7 +567,7 @@ do_register_consumer(VirtualHost,
521567 # consumer {pid = ConnectionPid ,
522568 owner = Owner ,
523569 subscription_id = SubscriptionId ,
524- active = active },
570+ status = ? ACTIVE },
525571 G1 = add_to_group (Consumer0 , Group0 ),
526572 {G1 ,
527573 [notify_consumer_effect (ConnectionPid , SubscriptionId ,
@@ -532,7 +578,7 @@ do_register_consumer(VirtualHost,
532578 # consumer {pid = ConnectionPid ,
533579 owner = Owner ,
534580 subscription_id = SubscriptionId ,
535- active = waiting },
581+ status = ? WAITING },
536582 G1 = add_to_group (Consumer0 , Group0 ),
537583
538584 case lookup_active_consumer (G1 ) of
@@ -548,7 +594,7 @@ do_register_consumer(VirtualHost,
548594 {update_consumer_state_in_group (G1 ,
549595 ActPid ,
550596 ActSubId ,
551- deactivating ),
597+ ? DEACTIVATING ),
552598 [notify_consumer_effect (ActPid ,
553599 ActSubId ,
554600 Stream ,
@@ -568,9 +614,9 @@ do_register_consumer(VirtualHost,
568614 ConsumerName ,
569615 Group1 ,
570616 StreamGroups0 ),
571- {value , # consumer {active = Active }} =
617+ {value , # consumer {status = Status }} =
572618 lookup_consumer (ConnectionPid , SubscriptionId , Group1 ),
573- {State #? MODULE {groups = StreamGroups1 }, {ok , is_active (Active )}, Effects }.
619+ {State #? MODULE {groups = StreamGroups1 }, {ok , is_active (Status )}, Effects }.
574620
575621handle_consumer_removal (# group {consumers = []} = G , _ , _ , _ ) ->
576622 {G , []};
@@ -606,7 +652,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
606652 {update_consumer_state_in_group (Group0 ,
607653 ActPid ,
608654 ActSubId ,
609- deactivating ),
655+ ? DEACTIVATING ),
610656 [notify_consumer_effect (ActPid , ActSubId ,
611657 Stream , ConsumerName , false , true )]}
612658 end ;
@@ -616,7 +662,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
616662 % % the active one is going away, picking a new one
617663 # consumer {pid = P , subscription_id = SID } =
618664 evaluate_active_consumer (Group0 ),
619- {update_consumer_state_in_group (Group0 , P , SID , active ),
665+ {update_consumer_state_in_group (Group0 , P , SID , ? ACTIVE ),
620666 [notify_consumer_effect (P , SID ,
621667 Stream , ConsumerName , true )]};
622668 false ->
@@ -683,13 +729,13 @@ compute_active_consumer(#group{consumers = Crs,
683729compute_active_consumer (# group {partition_index = - 1 ,
684730 consumers = [Consumer0 ]} =
685731 Group0 ) ->
686- Consumer1 = Consumer0 # consumer {active = active },
732+ Consumer1 = Consumer0 # consumer {status = ? ACTIVE },
687733 Group0 # group {consumers = [Consumer1 ]};
688734compute_active_consumer (# group {partition_index = - 1 ,
689735 consumers = [Consumer0 | T ]} =
690736 Group0 ) ->
691- Consumer1 = Consumer0 # consumer {active = active },
692- Consumers = lists :map (fun (C ) -> C # consumer {active = waiting } end , T ),
737+ Consumer1 = Consumer0 # consumer {status = ? ACTIVE },
738+ Consumers = lists :map (fun (C ) -> C # consumer {status = ? WAITING } end , T ),
693739 Group0 # group {consumers = [Consumer1 ] ++ Consumers }.
694740
695741evaluate_active_consumer (# group {partition_index = PartitionIndex ,
@@ -706,7 +752,7 @@ lookup_consumer(ConnectionPid, SubscriptionId,
706752 Consumers ).
707753
708754lookup_active_consumer (# group {consumers = Consumers }) ->
709- lists :search (fun (# consumer {active = Active }) -> is_active (Active ) end ,
755+ lists :search (fun (# consumer {status = Status }) -> is_active (Status ) end ,
710756 Consumers ).
711757
712758update_groups (_VirtualHost ,
@@ -732,20 +778,20 @@ update_groups(VirtualHost,
732778update_consumer_state_in_group (# group {consumers = Consumers0 } = G ,
733779 Pid ,
734780 SubId ,
735- NewState ) ->
781+ NewStatus ) ->
736782 CS1 = lists :map (fun (C0 ) ->
737783 case C0 of
738784 # consumer {pid = Pid , subscription_id = SubId } ->
739- C0 # consumer {active = NewState };
785+ C0 # consumer {status = NewStatus };
740786 C -> C
741787 end
742788 end ,
743789 Consumers0 ),
744790 G # group {consumers = CS1 }.
745791
746- update_consumers (# group {consumers = Consumers0 } = G , NewState ) ->
792+ update_consumers (# group {consumers = Consumers0 } = G , NewStatus ) ->
747793 Consumers1 = lists :map (fun (C ) ->
748- C # consumer {active = NewState }
794+ C # consumer {status = NewStatus }
749795 end , Consumers0 ),
750796 G # group {consumers = Consumers1 }.
751797
0 commit comments