99% % The Original Code is RabbitMQ.
1010% %
1111% % The Initial Developer of the Original Code is Pivotal Software, Inc.
12- % % Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
12+ % % Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
1313% % The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
1414% %
1515
@@ -2249,7 +2249,7 @@ handle_frame_post_auth(Transport,
22492249 {Connection , State };
22502250 true ->
22512251 {Connection1 , State1 } =
2252- remove_subscription (SubscriptionId , Connection , State ),
2252+ remove_subscription (SubscriptionId , Connection , State , true ),
22532253 response_ok (Transport , Connection , unsubscribe , CorrelationId ),
22542254 {Connection1 , State1 }
22552255 end ;
@@ -3081,7 +3081,7 @@ evaluate_state_after_secret_update(Transport,
30813081 _ ->
30823082 {C1 , S1 } =
30833083 lists :foldl (fun (SubId , {Conn , St }) ->
3084- remove_subscription (SubId , Conn , St )
3084+ remove_subscription (SubId , Conn , St , false )
30853085 end , {C0 , S0 }, Subs ),
30863086 {Acc #{Str => ok }, C1 , S1 }
30873087 end
@@ -3216,7 +3216,8 @@ notify_connection_closed(#statem_data{connection =
32163216 ConnectionState }) ->
32173217 rabbit_core_metrics :connection_closed (self ()),
32183218 [rabbit_stream_metrics :consumer_cancelled (self (),
3219- stream_r (S , Connection ), SubId )
3219+ stream_r (S , Connection ),
3220+ SubId , false )
32203221 || # consumer {configuration =
32213222 # consumer_configuration {stream = S ,
32223223 subscription_id = SubId }}
@@ -3304,7 +3305,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33043305 rabbit_stream_metrics :consumer_cancelled (self (),
33053306 stream_r (Stream ,
33063307 C0 ),
3307- SubId ),
3308+ SubId ,
3309+ false ),
33083310 maybe_unregister_consumer (
33093311 VirtualHost , Consumer ,
33103312 single_active_consumer (Consumer ),
@@ -3314,7 +3316,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33143316 rabbit_stream_metrics :consumer_cancelled (self (),
33153317 stream_r (Stream ,
33163318 C0 ),
3317- SubId ),
3319+ SubId ,
3320+ false ),
33183321 maybe_unregister_consumer (
33193322 VirtualHost , Consumer ,
33203323 single_active_consumer (Consumer ),
@@ -3431,7 +3434,8 @@ remove_subscription(SubscriptionId,
34313434 stream_subscriptions =
34323435 StreamSubscriptions } =
34333436 Connection ,
3434- # stream_connection_state {consumers = Consumers } = State ) ->
3437+ # stream_connection_state {consumers = Consumers } = State ,
3438+ Notify ) ->
34353439 #{SubscriptionId := Consumer } = Consumers ,
34363440 # consumer {log = Log ,
34373441 configuration = # consumer_configuration {stream = Stream , member_pid = MemberPid }} =
@@ -3457,7 +3461,8 @@ remove_subscription(SubscriptionId,
34573461 Connection2 = maybe_clean_connection_from_stream (MemberPid , Stream , Connection1 ),
34583462 rabbit_stream_metrics :consumer_cancelled (self (),
34593463 stream_r (Stream , Connection2 ),
3460- SubscriptionId ),
3464+ SubscriptionId ,
3465+ Notify ),
34613466
34623467 Requests1 = maybe_unregister_consumer (
34633468 VirtualHost , Consumer ,
0 commit comments