4343 ensure_monitors /4 ,
4444 handle_connection_down /3 ,
4545 handle_node_reconnected /3 ,
46- forget_connection /2 ,
46+ presume_connection_down /2 ,
4747 consumer_groups /3 ,
4848 group_consumers /5 ,
4949 overview /1 ,
6666
6767-define (CONNECTED , connected ).
6868-define (DISCONNECTED , disconnected ).
69- -define (FORGOTTTEN , forgotten ).
69+ -define (PDOWN , presumed_down ).
7070
7171-define (CONN_ACT , {? CONNECTED , ? ACTIVE }).
7272-define (CONN_WAIT , {? CONNECTED , ? WAITING }).
7373-define (DISCONN_ACT , {? DISCONNECTED , ? ACTIVE }).
74- -define (FORG_ACT , {? FORGOTTTEN , ? ACTIVE }).
74+ -define (PDOWN_ACT , {? PDOWN , ? ACTIVE }).
7575
7676-define (DISCONNECTED_TIMEOUT_APP_KEY , stream_sac_disconnected_timeout ).
7777-define (DISCONNECTED_TIMEOUT_CONF_KEY , disconnected_timeout ).
@@ -389,7 +389,7 @@ handle_forgotten_active_reconnected(Pid,
389389 lists :foldr (fun (# consumer {status = St ,
390390 pid = P ,
391391 subscription_id = SID } = C , {Cs , Eff })
392- when P =:= Pid andalso St =:= ? FORG_ACT ->
392+ when P =:= Pid andalso St =:= ? PDOWN_ACT ->
393393 {[csr_status (C , ? CONN_WAIT ) | Cs ],
394394 [notify_consumer_effect (Pid , SID , S ,
395395 Name , false , true ) | Eff ]};
@@ -400,13 +400,13 @@ handle_forgotten_active_reconnected(Pid,
400400 lists :foldr (fun (# consumer {status = St ,
401401 pid = P ,
402402 subscription_id = SID } = C , {Cs , Eff })
403- when P =:= Pid andalso St =:= ? FORG_ACT ->
403+ when P =:= Pid andalso St =:= ? PDOWN_ACT ->
404404 % % update forgotten active
405405 % % tell it to step down
406406 {[csr_status (C , ? CONN_WAIT ) | Cs ],
407407 [notify_consumer_effect (P , SID , S ,
408408 Name , false , true ) | Eff ]};
409- (# consumer {status = {? FORGOTTTEN , _ },
409+ (# consumer {status = {? PDOWN , _ },
410410 pid = P } = C , {Cs , Eff })
411411 when P =:= Pid ->
412412 % % update forgotten
@@ -428,7 +428,7 @@ handle_forgotten_active_reconnected(Pid,
428428 {S0 #? MODULE {groups = Groups1 }, Eff1 }.
429429
430430has_forgotten_active (# group {consumers = Consumers }, Pid ) ->
431- case lists :search (fun (# consumer {status = ? FORG_ACT ,
431+ case lists :search (fun (# consumer {status = ? PDOWN_ACT ,
432432 pid = P }) when P =:= Pid ->
433433 true ;
434434 (_ ) -> false
@@ -453,8 +453,6 @@ has_consumer_with_status(#group{consumers = Consumers}, Status) ->
453453 true
454454 end .
455455
456-
457-
458456maybe_rebalance_group (# group {partition_index = - 1 , consumers = Consumers0 } = G0 ,
459457 {_VH , S , Name }) ->
460458 case lookup_active_consumer (G0 ) of
@@ -611,7 +609,7 @@ group_consumers(VirtualHost,
611609 {error , not_found }
612610 end .
613611
614- cli_consumer_status_label ({? FORGOTTTEN , _ }) ->
612+ cli_consumer_status_label ({? PDOWN , _ }) ->
615613 inactive ;
616614cli_consumer_status_label ({_ , ? ACTIVE }) ->
617615 active ;
@@ -757,17 +755,17 @@ handle_node_reconnected(Node,
757755
758756 {State0 #? MODULE {pids_groups = PidsGroups1 }, Effects1 }.
759757
760- -spec forget_connection (connection_pid (), state ()) ->
758+ -spec presume_connection_down (connection_pid (), state ()) ->
761759 {state (), ra_machine :effects ()}.
762- forget_connection (Pid , #? MODULE {groups = Groups } = State0 ) ->
760+ presume_connection_down (Pid , #? MODULE {groups = Groups } = State0 ) ->
763761 {State1 , Eff } =
764- maps :fold (fun (G , _ , {St , Eff }) ->
765- handle_group_forget_connection (Pid , St , Eff , G )
766- end , {State0 , []}, Groups ),
762+ maps :fold (fun (G , _ , {St , Eff }) ->
763+ handle_group_connection_presumed_down (Pid , St , Eff , G )
764+ end , {State0 , []}, Groups ),
767765 {State1 , Eff }.
768766
769- handle_group_forget_connection (Pid , #? MODULE {groups = Groups0 } = S0 ,
770- Eff0 , {VH , S , Name } = K ) ->
767+ handle_group_connection_presumed_down (Pid , #? MODULE {groups = Groups0 } = S0 ,
768+ Eff0 , {VH , S , Name } = K ) ->
771769 case lookup_group (VH , S , Name , Groups0 ) of
772770 undefined ->
773771 {S0 , Eff0 };
@@ -776,7 +774,7 @@ handle_group_forget_connection(Pid, #?MODULE{groups = Groups0} = S0,
776774 lists :foldr (
777775 fun (# consumer {pid = P , status = {? DISCONNECTED , St }} = C , {L , _ })
778776 when P == Pid ->
779- {[csr_status (C , {? FORGOTTTEN , St }) | L ], true };
777+ {[csr_status (C , {? PDOWN , St }) | L ], true };
780778 (C , {L , UpdatedFlag }) ->
781779 {[C | L ], UpdatedFlag or false }
782780 end , {[], false }, Consumers0 ),
@@ -939,7 +937,7 @@ active_to_status(true) ->
939937active_to_status (false ) ->
940938 {? CONNECTED , ? WAITING }.
941939
942- is_active ({? FORGOTTTEN , _ }) ->
940+ is_active ({? PDOWN , _ }) ->
943941 false ;
944942is_active ({_ , ? ACTIVE }) ->
945943 true ;
@@ -1128,7 +1126,7 @@ has_consumers_from_pid(#group{consumers = Consumers}, Pid) ->
11281126
11291127compute_active_consumer (# group {partition_index = - 1 ,
11301128 consumers = Crs } = Group )
1131- when length (Crs ) == 0 ->
1129+ when length (Crs ) == 0 ->
11321130 Group ;
11331131compute_active_consumer (# group {partition_index = - 1 ,
11341132 consumers = Consumers } = G ) ->
@@ -1140,7 +1138,7 @@ compute_active_consumer(#group{partition_index = -1,
11401138 false ->
11411139 case evaluate_active_consumer (G ) of
11421140 undefined ->
1143- ok ;
1141+ G ;
11441142 # consumer {pid = Pid , subscription_id = SubId } ->
11451143 Consumers1 =
11461144 lists :foldr (
@@ -1173,17 +1171,17 @@ evaluate_active_consumer(#group{consumers = Consumers} = G) ->
11731171 end .
11741172
11751173do_evaluate_active_consumer (# group {consumers = Consumers })
1176- when length (Consumers ) == 0 ->
1174+ when length (Consumers ) == 0 ->
11771175 undefined ;
11781176do_evaluate_active_consumer (# group {partition_index = - 1 ,
1179- consumers = [Consumer ]}) ->
1177+ consumers = [Consumer ]}) ->
11801178 Consumer ;
11811179do_evaluate_active_consumer (# group {partition_index = - 1 ,
1182- consumers = [Consumer | _ ]}) ->
1180+ consumers = [Consumer | _ ]}) ->
11831181 Consumer ;
11841182do_evaluate_active_consumer (# group {partition_index = PartitionIndex ,
11851183 consumers = Consumers })
1186- when PartitionIndex >= 0 ->
1184+ when PartitionIndex >= 0 ->
11871185 ActiveConsumerIndex = PartitionIndex rem length (Consumers ),
11881186 lists :nth (ActiveConsumerIndex + 1 , Consumers ).
11891187
@@ -1284,8 +1282,7 @@ compute_node_pid_group_dependencies(Node, Groups) ->
12841282 end , Acc , Consumers )
12851283 end , #{}, Groups ).
12861284
1287- -spec csr (pid (), subscription_id (), owner (),
1288- {consumer_connectivity (), consumer_status ()}) ->
1285+ -spec csr (pid (), subscription_id (), owner (), consumer_status ()) ->
12891286 consumer ().
12901287csr (Pid , Id , Owner , Status ) ->
12911288 # consumer {pid = Pid ,
@@ -1294,8 +1291,7 @@ csr(Pid, Id, Owner, Status) ->
12941291 status = Status ,
12951292 ts = ts ()}.
12961293
1297- -spec csr_status (consumer (), {consumer_connectivity (), consumer_status ()}) ->
1298- consumer ().
1294+ -spec csr_status (consumer (), consumer_status ()) -> consumer ().
12991295csr_status (C , Status ) ->
13001296 C # consumer {status = Status , ts = ts ()}.
13011297
0 commit comments