106106 close_sent /3 ]).
107107-ifdef (TEST ).
108108-export ([ensure_token_expiry_timer /2 ,
109- evaluate_state_after_secret_update /4 ]).
109+ evaluate_state_after_secret_update /4 ,
110+ clean_subscriptions /4 ]).
110111-endif .
111112
112113callback_mode () ->
@@ -3280,89 +3281,19 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
32803281
32813282clean_state_after_stream_deletion_or_failure (MemberPid , Stream ,
32823283 # stream_connection {
3283- user = # user {username = Username },
3284- virtual_host = VirtualHost ,
3285- stream_subscriptions = StreamSubscriptions ,
3286- publishers = Publishers ,
3287- publisher_to_ids = PublisherToIds ,
3288- stream_leaders = Leaders ,
3289- outstanding_requests = Requests0 } = C0 ,
3290- # stream_connection_state {consumers = Consumers } = S0 ) ->
3284+ stream_leaders = Leaders } = C0 ,
3285+ S0 ) ->
32913286 {SubscriptionsCleaned , C1 , S1 } =
32923287 case stream_has_subscriptions (Stream , C0 ) of
32933288 true ->
3294- #{Stream := SubscriptionIds } = StreamSubscriptions ,
3295- Requests1 = lists :foldl (
3296- fun (SubId , Rqsts0 ) ->
3297- #{SubId := Consumer } = Consumers ,
3298- case {MemberPid , Consumer } of
3299- {undefined , _C } ->
3300- rabbit_stream_metrics :consumer_cancelled (self (),
3301- stream_r (Stream ,
3302- C0 ),
3303- SubId ,
3304- Username ),
3305- maybe_unregister_consumer (
3306- VirtualHost , Consumer ,
3307- single_active_consumer (Consumer ),
3308- Rqsts0 );
3309- {MemberPid , # consumer {configuration =
3310- # consumer_configuration {member_pid = MemberPid }}} ->
3311- rabbit_stream_metrics :consumer_cancelled (self (),
3312- stream_r (Stream ,
3313- C0 ),
3314- SubId ,
3315- Username ),
3316- maybe_unregister_consumer (
3317- VirtualHost , Consumer ,
3318- single_active_consumer (Consumer ),
3319- Rqsts0 );
3320- _ ->
3321- Rqsts0
3322- end
3323- end , Requests0 , SubscriptionIds ),
3324- {true ,
3325- C0 # stream_connection {stream_subscriptions =
3326- maps :remove (Stream ,
3327- StreamSubscriptions ),
3328- outstanding_requests = Requests1 },
3329- S0 # stream_connection_state {consumers =
3330- maps :without (SubscriptionIds ,
3331- Consumers )}};
3289+ clean_subscriptions (MemberPid , Stream , C0 , S0 );
33323290 false ->
33333291 {false , C0 , S0 }
33343292 end ,
33353293 {PublishersCleaned , C2 , S2 } =
33363294 case stream_has_publishers (Stream , C1 ) of
33373295 true ->
3338- {PurgedPubs , PurgedPubToIds } =
3339- maps :fold (fun (PubId ,
3340- # publisher {stream = S , reference = Ref },
3341- {Pubs , PubToIds }) when S =:= Stream andalso MemberPid =:= undefined ->
3342- rabbit_stream_metrics :publisher_deleted (self (),
3343- stream_r (Stream ,
3344- C1 ),
3345- PubId ),
3346- {maps :remove (PubId , Pubs ),
3347- maps :remove ({Stream , Ref }, PubToIds )};
3348- (PubId ,
3349- # publisher {stream = S , reference = Ref , leader = MPid },
3350- {Pubs , PubToIds }) when S =:= Stream andalso MPid =:= MemberPid ->
3351- rabbit_stream_metrics :publisher_deleted (self (),
3352- stream_r (Stream ,
3353- C1 ),
3354- PubId ),
3355- {maps :remove (PubId , Pubs ),
3356- maps :remove ({Stream , Ref }, PubToIds )};
3357-
3358- (_PubId , _Publisher , {Pubs , PubToIds }) ->
3359- {Pubs , PubToIds }
3360- end ,
3361- {Publishers , PublisherToIds }, Publishers ),
3362- {true ,
3363- C1 # stream_connection {publishers = PurgedPubs ,
3364- publisher_to_ids = PurgedPubToIds },
3365- S1 };
3296+ clean_publishers (MemberPid , Stream , C1 , S1 );
33663297 false ->
33673298 {false , C1 , S1 }
33683299 end ,
@@ -3384,6 +3315,99 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33843315 {not_cleaned , C2 # stream_connection {stream_leaders = Leaders1 }, S2 }
33853316 end .
33863317
3318+ clean_subscriptions (MemberPid , Stream ,
3319+ # stream_connection {user = # user {username = Username },
3320+ virtual_host = VirtualHost ,
3321+ stream_subscriptions = StreamSubs ,
3322+ outstanding_requests = Requests0 } = C0 ,
3323+ # stream_connection_state {consumers = Consumers } = S0 ) ->
3324+ #{Stream := SubIds } = StreamSubs ,
3325+ {DelSubs1 , Requests1 } =
3326+ lists :foldl (
3327+ fun (SubId , {DelSubIds , Rqsts0 }) ->
3328+ #{SubId := Consumer } = Consumers ,
3329+ case {MemberPid , Consumer } of
3330+ {undefined , _C } ->
3331+ rabbit_stream_metrics :consumer_cancelled (self (),
3332+ stream_r (Stream ,
3333+ C0 ),
3334+ SubId ,
3335+ Username ),
3336+ Rqsts1 = maybe_unregister_consumer (
3337+ VirtualHost , Consumer ,
3338+ single_active_consumer (Consumer ),
3339+ Rqsts0 ),
3340+ {[SubId | DelSubIds ], Rqsts1 };
3341+ {MemberPid ,
3342+ # consumer {configuration =
3343+ # consumer_configuration {member_pid = MemberPid }}} ->
3344+ rabbit_stream_metrics :consumer_cancelled (self (),
3345+ stream_r (Stream ,
3346+ C0 ),
3347+ SubId ,
3348+ Username ,
3349+ false ),
3350+ Rqsts1 = maybe_unregister_consumer (
3351+ VirtualHost , Consumer ,
3352+ single_active_consumer (Consumer ),
3353+ Rqsts0 ),
3354+ {[SubId | DelSubIds ], Rqsts1 };
3355+ _ ->
3356+ {DelSubIds , Rqsts0 }
3357+ end
3358+ end , {[], Requests0 }, SubIds ),
3359+ case DelSubs1 of
3360+ [] ->
3361+ {false , C0 , S0 };
3362+ _ ->
3363+ StreamSubs1 = case SubIds -- DelSubs1 of
3364+ [] ->
3365+ maps :remove (Stream , StreamSubs );
3366+ RemSubIds ->
3367+ StreamSubs #{Stream => RemSubIds }
3368+ end ,
3369+ Consumers1 = maps :without (DelSubs1 , Consumers ),
3370+ {true ,
3371+ C0 # stream_connection {stream_subscriptions = StreamSubs1 ,
3372+ outstanding_requests = Requests1 },
3373+ S0 # stream_connection_state {consumers = Consumers1 }}
3374+ end .
3375+
3376+ clean_publishers (MemberPid , Stream ,
3377+ # stream_connection {
3378+ publishers = Publishers ,
3379+ publisher_to_ids = PublisherToIds } = C0 , S0 ) ->
3380+ {Updated , PurgedPubs , PurgedPubToIds } =
3381+ maps :fold (fun (PubId , # publisher {stream = S , reference = Ref },
3382+ {_ , Pubs , PubToIds })
3383+ when S =:= Stream andalso MemberPid =:= undefined ->
3384+ rabbit_stream_metrics :publisher_deleted (self (),
3385+ stream_r (Stream ,
3386+ C0 ),
3387+ PubId ),
3388+ {true ,
3389+ maps :remove (PubId , Pubs ),
3390+ maps :remove ({Stream , Ref }, PubToIds )};
3391+ (PubId , # publisher {stream = S , reference = Ref , leader = MPid },
3392+ {_ , Pubs , PubToIds })
3393+ when S =:= Stream andalso MPid =:= MemberPid ->
3394+ rabbit_stream_metrics :publisher_deleted (self (),
3395+ stream_r (Stream ,
3396+ C0 ),
3397+ PubId ),
3398+ {true ,
3399+ maps :remove (PubId , Pubs ),
3400+ maps :remove ({Stream , Ref }, PubToIds )};
3401+
3402+ (_PubId , _Publisher , {Updated , Pubs , PubToIds }) ->
3403+ {Updated , Pubs , PubToIds }
3404+ end ,
3405+ {false , Publishers , PublisherToIds }, Publishers ),
3406+ {Updated ,
3407+ C0 # stream_connection {publishers = PurgedPubs ,
3408+ publisher_to_ids = PurgedPubToIds },
3409+ S0 }.
3410+
33873411store_offset (Reference , _ , _ , C ) when ? IS_INVALID_REF (Reference ) ->
33883412 rabbit_log :warning (" Reference is too long to store offset: ~p " , [byte_size (Reference )]),
33893413 C ;
@@ -3401,8 +3425,7 @@ store_offset(Reference, Stream, Offset, Connection0) ->
34013425
34023426lookup_leader (Stream ,
34033427 # stream_connection {stream_leaders = StreamLeaders ,
3404- virtual_host = VirtualHost } =
3405- Connection ) ->
3428+ virtual_host = VirtualHost } = Connection ) ->
34063429 case maps :get (Stream , StreamLeaders , undefined ) of
34073430 undefined ->
34083431 case lookup_leader_from_manager (VirtualHost , Stream ) of
@@ -3411,6 +3434,7 @@ lookup_leader(Stream,
34113434 {ok , LeaderPid } ->
34123435 Connection1 =
34133436 maybe_monitor_stream (LeaderPid , Stream , Connection ),
3437+
34143438 {LeaderPid ,
34153439 Connection1 # stream_connection {stream_leaders =
34163440 StreamLeaders #{Stream =>
0 commit comments