106106 close_sent /3 ]).
107107-ifdef (TEST ).
108108-export ([ensure_token_expiry_timer /2 ,
109- evaluate_state_after_secret_update /4 ]).
109+ evaluate_state_after_secret_update /4 ,
110+ clean_subscriptions /4 ]).
110111-endif .
111112
112113callback_mode () ->
@@ -3280,89 +3281,19 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
32803281
32813282clean_state_after_stream_deletion_or_failure (MemberPid , Stream ,
32823283 # stream_connection {
3283- user = # user {username = Username },
3284- virtual_host = VirtualHost ,
3285- stream_subscriptions = StreamSubscriptions ,
3286- publishers = Publishers ,
3287- publisher_to_ids = PublisherToIds ,
3288- stream_leaders = Leaders ,
3289- outstanding_requests = Requests0 } = C0 ,
3290- # stream_connection_state {consumers = Consumers } = S0 ) ->
3284+ stream_leaders = Leaders } = C0 ,
3285+ S0 ) ->
32913286 {SubscriptionsCleaned , C1 , S1 } =
32923287 case stream_has_subscriptions (Stream , C0 ) of
32933288 true ->
3294- #{Stream := SubscriptionIds } = StreamSubscriptions ,
3295- Requests1 = lists :foldl (
3296- fun (SubId , Rqsts0 ) ->
3297- #{SubId := Consumer } = Consumers ,
3298- case {MemberPid , Consumer } of
3299- {undefined , _C } ->
3300- rabbit_stream_metrics :consumer_cancelled (self (),
3301- stream_r (Stream ,
3302- C0 ),
3303- SubId ,
3304- Username ),
3305- maybe_unregister_consumer (
3306- VirtualHost , Consumer ,
3307- single_active_consumer (Consumer ),
3308- Rqsts0 );
3309- {MemberPid , # consumer {configuration =
3310- # consumer_configuration {member_pid = MemberPid }}} ->
3311- rabbit_stream_metrics :consumer_cancelled (self (),
3312- stream_r (Stream ,
3313- C0 ),
3314- SubId ,
3315- Username ),
3316- maybe_unregister_consumer (
3317- VirtualHost , Consumer ,
3318- single_active_consumer (Consumer ),
3319- Rqsts0 );
3320- _ ->
3321- Rqsts0
3322- end
3323- end , Requests0 , SubscriptionIds ),
3324- {true ,
3325- C0 # stream_connection {stream_subscriptions =
3326- maps :remove (Stream ,
3327- StreamSubscriptions ),
3328- outstanding_requests = Requests1 },
3329- S0 # stream_connection_state {consumers =
3330- maps :without (SubscriptionIds ,
3331- Consumers )}};
3289+ clean_subscriptions (MemberPid , Stream , C0 , S0 );
33323290 false ->
33333291 {false , C0 , S0 }
33343292 end ,
33353293 {PublishersCleaned , C2 , S2 } =
33363294 case stream_has_publishers (Stream , C1 ) of
33373295 true ->
3338- {PurgedPubs , PurgedPubToIds } =
3339- maps :fold (fun (PubId ,
3340- # publisher {stream = S , reference = Ref },
3341- {Pubs , PubToIds }) when S =:= Stream andalso MemberPid =:= undefined ->
3342- rabbit_stream_metrics :publisher_deleted (self (),
3343- stream_r (Stream ,
3344- C1 ),
3345- PubId ),
3346- {maps :remove (PubId , Pubs ),
3347- maps :remove ({Stream , Ref }, PubToIds )};
3348- (PubId ,
3349- # publisher {stream = S , reference = Ref , leader = MPid },
3350- {Pubs , PubToIds }) when S =:= Stream andalso MPid =:= MemberPid ->
3351- rabbit_stream_metrics :publisher_deleted (self (),
3352- stream_r (Stream ,
3353- C1 ),
3354- PubId ),
3355- {maps :remove (PubId , Pubs ),
3356- maps :remove ({Stream , Ref }, PubToIds )};
3357-
3358- (_PubId , _Publisher , {Pubs , PubToIds }) ->
3359- {Pubs , PubToIds }
3360- end ,
3361- {Publishers , PublisherToIds }, Publishers ),
3362- {true ,
3363- C1 # stream_connection {publishers = PurgedPubs ,
3364- publisher_to_ids = PurgedPubToIds },
3365- S1 };
3296+ clean_publishers (MemberPid , Stream , C1 , S1 );
33663297 false ->
33673298 {false , C1 , S1 }
33683299 end ,
@@ -3384,6 +3315,98 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33843315 {not_cleaned , C2 # stream_connection {stream_leaders = Leaders1 }, S2 }
33853316 end .
33863317
3318+ clean_subscriptions (MemberPid , Stream ,
3319+ # stream_connection {user = # user {username = Username },
3320+ virtual_host = VirtualHost ,
3321+ stream_subscriptions = StreamSubs ,
3322+ outstanding_requests = Requests0 } = C0 ,
3323+ # stream_connection_state {consumers = Consumers } = S0 ) ->
3324+ #{Stream := SubIds } = StreamSubs ,
3325+ {DelSubs1 , Requests1 } =
3326+ lists :foldl (
3327+ fun (SubId , {DelSubIds , Rqsts0 }) ->
3328+ #{SubId := Consumer } = Consumers ,
3329+ case {MemberPid , Consumer } of
3330+ {undefined , _C } ->
3331+ rabbit_stream_metrics :consumer_cancelled (self (),
3332+ stream_r (Stream ,
3333+ C0 ),
3334+ SubId ,
3335+ Username ),
3336+ Rqsts1 = maybe_unregister_consumer (
3337+ VirtualHost , Consumer ,
3338+ single_active_consumer (Consumer ),
3339+ Rqsts0 ),
3340+ {[SubId | DelSubIds ], Rqsts1 };
3341+ {MemberPid ,
3342+ # consumer {configuration =
3343+ # consumer_configuration {member_pid = MemberPid }}} ->
3344+ rabbit_stream_metrics :consumer_cancelled (self (),
3345+ stream_r (Stream ,
3346+ C0 ),
3347+ SubId ,
3348+ Username ),
3349+ Rqsts1 = maybe_unregister_consumer (
3350+ VirtualHost , Consumer ,
3351+ single_active_consumer (Consumer ),
3352+ Rqsts0 ),
3353+ {[SubId | DelSubIds ], Rqsts1 };
3354+ _ ->
3355+ {DelSubIds , Rqsts0 }
3356+ end
3357+ end , {[], Requests0 }, SubIds ),
3358+ case DelSubs1 of
3359+ [] ->
3360+ {false , C0 , S0 };
3361+ _ ->
3362+ StreamSubs1 = case SubIds -- DelSubs1 of
3363+ [] ->
3364+ maps :remove (Stream , StreamSubs );
3365+ RemSubIds ->
3366+ StreamSubs #{Stream => RemSubIds }
3367+ end ,
3368+ Consumers1 = maps :without (DelSubs1 , Consumers ),
3369+ {true ,
3370+ C0 # stream_connection {stream_subscriptions = StreamSubs1 ,
3371+ outstanding_requests = Requests1 },
3372+ S0 # stream_connection_state {consumers = Consumers1 }}
3373+ end .
3374+
3375+ clean_publishers (MemberPid , Stream ,
3376+ # stream_connection {
3377+ publishers = Publishers ,
3378+ publisher_to_ids = PublisherToIds } = C0 , S0 ) ->
3379+ {Updated , PurgedPubs , PurgedPubToIds } =
3380+ maps :fold (fun (PubId , # publisher {stream = S , reference = Ref },
3381+ {_ , Pubs , PubToIds })
3382+ when S =:= Stream andalso MemberPid =:= undefined ->
3383+ rabbit_stream_metrics :publisher_deleted (self (),
3384+ stream_r (Stream ,
3385+ C0 ),
3386+ PubId ),
3387+ {true ,
3388+ maps :remove (PubId , Pubs ),
3389+ maps :remove ({Stream , Ref }, PubToIds )};
3390+ (PubId , # publisher {stream = S , reference = Ref , leader = MPid },
3391+ {_ , Pubs , PubToIds })
3392+ when S =:= Stream andalso MPid =:= MemberPid ->
3393+ rabbit_stream_metrics :publisher_deleted (self (),
3394+ stream_r (Stream ,
3395+ C0 ),
3396+ PubId ),
3397+ {true ,
3398+ maps :remove (PubId , Pubs ),
3399+ maps :remove ({Stream , Ref }, PubToIds )};
3400+
3401+ (_PubId , _Publisher , {Updated , Pubs , PubToIds }) ->
3402+ {Updated , Pubs , PubToIds }
3403+ end ,
3404+ {false , Publishers , PublisherToIds }, Publishers ),
3405+ {Updated ,
3406+ C0 # stream_connection {publishers = PurgedPubs ,
3407+ publisher_to_ids = PurgedPubToIds },
3408+ S0 }.
3409+
33873410store_offset (Reference , _ , _ , C ) when ? IS_INVALID_REF (Reference ) ->
33883411 rabbit_log :warning (" Reference is too long to store offset: ~p " , [byte_size (Reference )]),
33893412 C ;
@@ -3401,8 +3424,7 @@ store_offset(Reference, Stream, Offset, Connection0) ->
34013424
34023425lookup_leader (Stream ,
34033426 # stream_connection {stream_leaders = StreamLeaders ,
3404- virtual_host = VirtualHost } =
3405- Connection ) ->
3427+ virtual_host = VirtualHost } = Connection ) ->
34063428 case maps :get (Stream , StreamLeaders , undefined ) of
34073429 undefined ->
34083430 case lookup_leader_from_manager (VirtualHost , Stream ) of
@@ -3411,6 +3433,7 @@ lookup_leader(Stream,
34113433 {ok , LeaderPid } ->
34123434 Connection1 =
34133435 maybe_monitor_stream (LeaderPid , Stream , Connection ),
3436+
34143437 {LeaderPid ,
34153438 Connection1 # stream_connection {stream_leaders =
34163439 StreamLeaders #{Stream =>
0 commit comments