8181-define (UNKNOWN_FIELD , unknown_field ).
8282-define (SILENT_CLOSE_DELAY , 3_000 ).
8383-define (IS_INVALID_REF (Ref ), is_binary (Ref ) andalso byte_size (Ref ) > 255 ).
84+ -define (SAC_MOD , rabbit_stream_sac_coordinator ).
8485
8586-import (rabbit_stream_utils , [check_write_permitted /2 ,
8687 check_read_permitted /3 ]).
@@ -722,7 +723,7 @@ open(info, {OK, S, Data},
722723 connection_state = State2 }}
723724 end ;
724725open (info , {sac , check_connection , _ }, State ) ->
725- rabbit_stream_sac_coordinator : connection_reconnected (self ()),
726+ _ = sac_connection_reconnected (self ()),
726727 {keep_state , State };
727728open (info ,
728729 {sac , #{subscription_id := SubId ,
@@ -794,17 +795,15 @@ open(info,
794795 rabbit_log :debug (" Subscription ~tp on ~tp has been deleted." ,
795796 [SubId , Stream ]),
796797 rabbit_log :debug (" Active ~tp , message ~tp " , [Active , Msg ]),
797- case {Active , Msg } of
798- {false , #{stepping_down := true ,
799- stream := St ,
800- consumer_name := ConsumerName }} ->
801- rabbit_log :debug (" Former active consumer gone, activating consumer " ++
802- " on stream ~tp , group ~tp " , [St , ConsumerName ]),
803- _ = rabbit_stream_sac_coordinator :activate_consumer (VirtualHost ,
804- St ,
805- ConsumerName );
806- _ ->
807- ok
798+ _ = case {Active , Msg } of
799+ {false , #{stepping_down := true ,
800+ stream := St ,
801+ consumer_name := ConsumerName }} ->
802+ rabbit_log :debug (" Former active consumer gone, activating consumer " ++
803+ " on stream ~tp , group ~tp " , [St , ConsumerName ]),
804+ sac_activate_consumer (VirtualHost , St , ConsumerName );
805+ _ ->
806+ ok
808807 end ,
809808 {Connection0 , ConnState0 }
810809 end ,
@@ -2554,9 +2553,8 @@ handle_frame_post_auth(Transport,
25542553 rabbit_log :debug (" Subscription ~tp on stream ~tp , group ~tp " ++
25552554 " has stepped down, activating consumer" ,
25562555 [SubscriptionId , Stream , ConsumerName ]),
2557- _ = rabbit_stream_sac_coordinator :activate_consumer (VirtualHost ,
2558- Stream ,
2559- ConsumerName ),
2556+ _ = sac_activate_consumer (VirtualHost , Stream ,
2557+ ConsumerName ),
25602558 ok ;
25612559 _ ->
25622560 ok
@@ -3015,21 +3013,9 @@ handle_subscription(Transport,#stream_connection{
30153013
30163014maybe_register_consumer (_ , _ , _ , _ , _ , _ , false = _Sac ) ->
30173015 {ok , true };
3018- maybe_register_consumer (VirtualHost ,
3019- Stream ,
3020- ConsumerName ,
3021- ConnectionName ,
3022- SubscriptionId ,
3023- Properties ,
3024- true ) ->
3025- PartitionIndex = partition_index (VirtualHost , Stream , Properties ),
3026- rabbit_stream_sac_coordinator :register_consumer (VirtualHost ,
3027- Stream ,
3028- PartitionIndex ,
3029- ConsumerName ,
3030- self (),
3031- ConnectionName ,
3032- SubscriptionId ).
3016+ maybe_register_consumer (VH , St , Name , ConnName , SubId , Properties , true ) ->
3017+ PartitionIndex = partition_index (VH , St , Properties ),
3018+ sac_register_consumer (VH , St , PartitionIndex , Name , self (), ConnName , SubId ).
30333019
30343020maybe_send_consumer_update (Transport ,
30353021 Connection = # stream_connection {
@@ -3175,13 +3161,12 @@ maybe_unregister_consumer(VirtualHost,
31753161 ConsumerName = consumer_name (Properties ),
31763162
31773163 Requests1 = maps :fold (
3178- fun (_ , # request {content =
3179- #{active := false ,
3180- subscription_id := SubId ,
3181- stepping_down := true }}, Acc ) when SubId =:= SubscriptionId ->
3182- _ = rabbit_stream_sac_coordinator :activate_consumer (VirtualHost ,
3183- Stream ,
3184- ConsumerName ),
3164+ fun (_ , # request {content = #{active := false ,
3165+ subscription_id := SubId ,
3166+ stepping_down := true }}, Acc )
3167+ when SubId =:= SubscriptionId ->
3168+ _ = sac_activate_consumer (VirtualHost , Stream ,
3169+ ConsumerName ),
31853170 rabbit_log :debug (" Outstanding SAC activation request for stream '~tp ', " ++
31863171 " group '~tp ', sending activation." ,
31873172 [Stream , ConsumerName ]),
@@ -3190,11 +3175,8 @@ maybe_unregister_consumer(VirtualHost,
31903175 Acc #{K => V }
31913176 end , maps :new (), Requests ),
31923177
3193- _ = rabbit_stream_sac_coordinator :unregister_consumer (VirtualHost ,
3194- Stream ,
3195- ConsumerName ,
3196- self (),
3197- SubscriptionId ),
3178+ _ = sac_unregister_consumer (VirtualHost , Stream , ConsumerName ,
3179+ self (), SubscriptionId ),
31983180 Requests1 .
31993181
32003182partition_index (VirtualHost , Stream , Properties ) ->
@@ -4037,3 +4019,40 @@ stream_from_consumers(SubId, Consumers) ->
40374019% % for a bit so they can't DOS us with repeated failed logins etc.
40384020silent_close_delay () ->
40394021 timer :sleep (? SILENT_CLOSE_DELAY ).
4022+
4023+ sac_connection_reconnected (Pid ) ->
4024+ sac_call (fun () ->
4025+ ? SAC_MOD :connection_reconnected (Pid )
4026+ end ).
4027+
4028+ sac_activate_consumer (VH , St , Name ) ->
4029+ sac_call (fun () ->
4030+ ? SAC_MOD :activate_consumer (VH , St , Name )
4031+ end ).
4032+
4033+ sac_register_consumer (VH , St , PartitionIndex , Name , Pid , ConnName , SubId ) ->
4034+ sac_call (fun () ->
4035+ ? SAC_MOD :register_consumer (VH , St , PartitionIndex ,
4036+ Name , Pid , ConnName ,
4037+ SubId )
4038+ end ).
4039+
4040+ sac_unregister_consumer (VH , St , Name , Pid , SubId ) ->
4041+ sac_call (fun () ->
4042+ ? SAC_MOD :unregister_consumer (VH , St , Name , Pid , SubId )
4043+ end ).
4044+
4045+ sac_call (Call ) ->
4046+ case Call () of
4047+ {error , Reason } = Err ->
4048+ case ? SAC_MOD :is_sac_error (Reason ) of
4049+ true ->
4050+ Err ;
4051+ _ ->
4052+ rabbit_log :info (" Stream SAC coordinator call failed with ~tp " ,
4053+ [Reason ]),
4054+ throw ({stop , {shutdown , stream_sac_coordinator_error }})
4055+ end ;
4056+ R ->
4057+ R
4058+ end .
0 commit comments