106106 close_sent /3 ]).
107107-ifdef (TEST ).
108108-export ([ensure_token_expiry_timer /2 ,
109- evaluate_state_after_secret_update /4 ]).
109+ evaluate_state_after_secret_update /4 ,
110+ clean_subscriptions /4 ]).
110111-endif .
111112
112113callback_mode () ->
@@ -3280,91 +3281,19 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
32803281
32813282clean_state_after_stream_deletion_or_failure (MemberPid , Stream ,
32823283 # stream_connection {
3283- user = # user {username = Username },
3284- virtual_host = VirtualHost ,
3285- stream_subscriptions = StreamSubscriptions ,
3286- publishers = Publishers ,
3287- publisher_to_ids = PublisherToIds ,
3288- stream_leaders = Leaders ,
3289- outstanding_requests = Requests0 } = C0 ,
3290- # stream_connection_state {consumers = Consumers } = S0 ) ->
3284+ stream_leaders = Leaders } = C0 ,
3285+ S0 ) ->
32913286 {SubscriptionsCleaned , C1 , S1 } =
32923287 case stream_has_subscriptions (Stream , C0 ) of
32933288 true ->
3294- #{Stream := SubscriptionIds } = StreamSubscriptions ,
3295- Requests1 = lists :foldl (
3296- fun (SubId , Rqsts0 ) ->
3297- #{SubId := Consumer } = Consumers ,
3298- case {MemberPid , Consumer } of
3299- {undefined , _C } ->
3300- rabbit_stream_metrics :consumer_cancelled (self (),
3301- stream_r (Stream ,
3302- C0 ),
3303- SubId ,
3304- Username ,
3305- false ),
3306- maybe_unregister_consumer (
3307- VirtualHost , Consumer ,
3308- single_active_consumer (Consumer ),
3309- Rqsts0 );
3310- {MemberPid , # consumer {configuration =
3311- # consumer_configuration {member_pid = MemberPid }}} ->
3312- rabbit_stream_metrics :consumer_cancelled (self (),
3313- stream_r (Stream ,
3314- C0 ),
3315- SubId ,
3316- Username ,
3317- false ),
3318- maybe_unregister_consumer (
3319- VirtualHost , Consumer ,
3320- single_active_consumer (Consumer ),
3321- Rqsts0 );
3322- _ ->
3323- Rqsts0
3324- end
3325- end , Requests0 , SubscriptionIds ),
3326- {true ,
3327- C0 # stream_connection {stream_subscriptions =
3328- maps :remove (Stream ,
3329- StreamSubscriptions ),
3330- outstanding_requests = Requests1 },
3331- S0 # stream_connection_state {consumers =
3332- maps :without (SubscriptionIds ,
3333- Consumers )}};
3289+ clean_subscriptions (MemberPid , Stream , C0 , S0 );
33343290 false ->
33353291 {false , C0 , S0 }
33363292 end ,
33373293 {PublishersCleaned , C2 , S2 } =
33383294 case stream_has_publishers (Stream , C1 ) of
33393295 true ->
3340- {PurgedPubs , PurgedPubToIds } =
3341- maps :fold (fun (PubId ,
3342- # publisher {stream = S , reference = Ref },
3343- {Pubs , PubToIds }) when S =:= Stream andalso MemberPid =:= undefined ->
3344- rabbit_stream_metrics :publisher_deleted (self (),
3345- stream_r (Stream ,
3346- C1 ),
3347- PubId ),
3348- {maps :remove (PubId , Pubs ),
3349- maps :remove ({Stream , Ref }, PubToIds )};
3350- (PubId ,
3351- # publisher {stream = S , reference = Ref , leader = MPid },
3352- {Pubs , PubToIds }) when S =:= Stream andalso MPid =:= MemberPid ->
3353- rabbit_stream_metrics :publisher_deleted (self (),
3354- stream_r (Stream ,
3355- C1 ),
3356- PubId ),
3357- {maps :remove (PubId , Pubs ),
3358- maps :remove ({Stream , Ref }, PubToIds )};
3359-
3360- (_PubId , _Publisher , {Pubs , PubToIds }) ->
3361- {Pubs , PubToIds }
3362- end ,
3363- {Publishers , PublisherToIds }, Publishers ),
3364- {true ,
3365- C1 # stream_connection {publishers = PurgedPubs ,
3366- publisher_to_ids = PurgedPubToIds },
3367- S1 };
3296+ clean_publishers (MemberPid , Stream , C1 , S1 );
33683297 false ->
33693298 {false , C1 , S1 }
33703299 end ,
@@ -3386,6 +3315,100 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33863315 {not_cleaned , C2 # stream_connection {stream_leaders = Leaders1 }, S2 }
33873316 end .
33883317
3318+ clean_subscriptions (MemberPid , Stream ,
3319+ # stream_connection {user = # user {username = Username },
3320+ virtual_host = VirtualHost ,
3321+ stream_subscriptions = StreamSubs ,
3322+ outstanding_requests = Requests0 } = C0 ,
3323+ # stream_connection_state {consumers = Consumers } = S0 ) ->
3324+ #{Stream := SubIds } = StreamSubs ,
3325+ {DelSubs1 , Requests1 } =
3326+ lists :foldl (
3327+ fun (SubId , {DelSubIds , Rqsts0 }) ->
3328+ #{SubId := Consumer } = Consumers ,
3329+ case {MemberPid , Consumer } of
3330+ {undefined , _C } ->
3331+ rabbit_stream_metrics :consumer_cancelled (self (),
3332+ stream_r (Stream ,
3333+ C0 ),
3334+ SubId ,
3335+ Username ,
3336+ false ),
3337+ Rqsts1 = maybe_unregister_consumer (
3338+ VirtualHost , Consumer ,
3339+ single_active_consumer (Consumer ),
3340+ Rqsts0 ),
3341+ {[SubId | DelSubIds ], Rqsts1 };
3342+ {MemberPid ,
3343+ # consumer {configuration =
3344+ # consumer_configuration {member_pid = MemberPid }}} ->
3345+ rabbit_stream_metrics :consumer_cancelled (self (),
3346+ stream_r (Stream ,
3347+ C0 ),
3348+ SubId ,
3349+ Username ,
3350+ false ),
3351+ Rqsts1 = maybe_unregister_consumer (
3352+ VirtualHost , Consumer ,
3353+ single_active_consumer (Consumer ),
3354+ Rqsts0 ),
3355+ {[SubId | DelSubIds ], Rqsts1 };
3356+ _ ->
3357+ {DelSubIds , Rqsts0 }
3358+ end
3359+ end , {[], Requests0 }, SubIds ),
3360+ case DelSubs1 of
3361+ [] ->
3362+ {false , C0 , S0 };
3363+ _ ->
3364+ StreamSubs1 = case SubIds -- DelSubs1 of
3365+ [] ->
3366+ maps :remove (Stream , StreamSubs );
3367+ RemSubIds ->
3368+ StreamSubs #{Stream => RemSubIds }
3369+ end ,
3370+ Consumers1 = maps :without (DelSubs1 , Consumers ),
3371+ {true ,
3372+ C0 # stream_connection {stream_subscriptions = StreamSubs1 ,
3373+ outstanding_requests = Requests1 },
3374+ S0 # stream_connection_state {consumers = Consumers1 }}
3375+ end .
3376+
3377+ clean_publishers (MemberPid , Stream ,
3378+ # stream_connection {
3379+ publishers = Publishers ,
3380+ publisher_to_ids = PublisherToIds } = C0 , S0 ) ->
3381+ {Updated , PurgedPubs , PurgedPubToIds } =
3382+ maps :fold (fun (PubId , # publisher {stream = S , reference = Ref },
3383+ {_ , Pubs , PubToIds })
3384+ when S =:= Stream andalso MemberPid =:= undefined ->
3385+ rabbit_stream_metrics :publisher_deleted (self (),
3386+ stream_r (Stream ,
3387+ C0 ),
3388+ PubId ),
3389+ {true ,
3390+ maps :remove (PubId , Pubs ),
3391+ maps :remove ({Stream , Ref }, PubToIds )};
3392+ (PubId , # publisher {stream = S , reference = Ref , leader = MPid },
3393+ {_ , Pubs , PubToIds })
3394+ when S =:= Stream andalso MPid =:= MemberPid ->
3395+ rabbit_stream_metrics :publisher_deleted (self (),
3396+ stream_r (Stream ,
3397+ C0 ),
3398+ PubId ),
3399+ {true ,
3400+ maps :remove (PubId , Pubs ),
3401+ maps :remove ({Stream , Ref }, PubToIds )};
3402+
3403+ (_PubId , _Publisher , {Updated , Pubs , PubToIds }) ->
3404+ {Updated , Pubs , PubToIds }
3405+ end ,
3406+ {false , Publishers , PublisherToIds }, Publishers ),
3407+ {Updated ,
3408+ C0 # stream_connection {publishers = PurgedPubs ,
3409+ publisher_to_ids = PurgedPubToIds },
3410+ S0 }.
3411+
33893412store_offset (Reference , _ , _ , C ) when ? IS_INVALID_REF (Reference ) ->
33903413 rabbit_log :warning (" Reference is too long to store offset: ~p " , [byte_size (Reference )]),
33913414 C ;
@@ -3403,8 +3426,7 @@ store_offset(Reference, Stream, Offset, Connection0) ->
34033426
34043427lookup_leader (Stream ,
34053428 # stream_connection {stream_leaders = StreamLeaders ,
3406- virtual_host = VirtualHost } =
3407- Connection ) ->
3429+ virtual_host = VirtualHost } = Connection ) ->
34083430 case maps :get (Stream , StreamLeaders , undefined ) of
34093431 undefined ->
34103432 case lookup_leader_from_manager (VirtualHost , Stream ) of
@@ -3413,6 +3435,7 @@ lookup_leader(Stream,
34133435 {ok , LeaderPid } ->
34143436 Connection1 =
34153437 maybe_monitor_stream (LeaderPid , Stream , Connection ),
3438+
34163439 {LeaderPid ,
34173440 Connection1 # stream_connection {stream_leaders =
34183441 StreamLeaders #{Stream =>
0 commit comments