Skip to content

Commit 7410117

Browse files
Merge pull request #13976 from rabbitmq/mergify/bp/v4.1.x/pr-13970
Always emit consumer_deleted event when stream consumer goes away (backport #13970)
2 parents 5e3cfbe + 6c115f6 commit 7410117

File tree

2 files changed

+13
-21
lines changed

2 files changed

+13
-21
lines changed

deps/rabbitmq_stream/src/rabbit_stream_metrics.erl

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
-export([init/0]).
2323
-export([consumer_created/10,
2424
consumer_updated/9,
25-
consumer_cancelled/5]).
25+
consumer_cancelled/4]).
2626
-export([publisher_created/4,
2727
publisher_updated/7,
2828
publisher_deleted/3]).
@@ -121,21 +121,17 @@ consumer_updated(Connection,
121121

122122
ok.
123123

124-
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) ->
124+
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser) ->
125125
ets:delete(?TABLE_CONSUMER,
126126
{StreamResource, Connection, SubscriptionId}),
127127
rabbit_global_counters:consumer_deleted(stream),
128128
rabbit_core_metrics:consumer_deleted(Connection,
129129
consumer_tag(SubscriptionId),
130130
StreamResource),
131-
case Notify of
132-
true ->
133-
rabbit_event:notify(consumer_deleted,
134-
[{consumer_tag, consumer_tag(SubscriptionId)},
135-
{channel, self()}, {queue, StreamResource},
136-
{user_who_performed_action, ActingUser}]);
137-
_ -> ok
138-
end,
131+
rabbit_event:notify(consumer_deleted,
132+
[{consumer_tag, consumer_tag(SubscriptionId)},
133+
{channel, self()}, {queue, StreamResource},
134+
{user_who_performed_action, ActingUser}]),
139135
ok.
140136

141137
publisher_created(Connection,

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2155,7 +2155,7 @@ handle_frame_post_auth(Transport,
21552155
{Connection, State};
21562156
true ->
21572157
{Connection1, State1} =
2158-
remove_subscription(SubscriptionId, Connection, State, true),
2158+
remove_subscription(SubscriptionId, Connection, State),
21592159
response_ok(Transport, Connection, unsubscribe, CorrelationId),
21602160
{Connection1, State1}
21612161
end;
@@ -3084,7 +3084,7 @@ evaluate_state_after_secret_update(Transport,
30843084
_ ->
30853085
{C1, S1} =
30863086
lists:foldl(fun(SubId, {Conn, St}) ->
3087-
remove_subscription(SubId, Conn, St, false)
3087+
remove_subscription(SubId, Conn, St)
30883088
end, {C0, S0}, Subs),
30893089
{Acc#{Str => ok}, C1, S1}
30903090
end
@@ -3216,7 +3216,7 @@ notify_connection_closed(#statem_data{
32163216
rabbit_core_metrics:connection_closed(self()),
32173217
[rabbit_stream_metrics:consumer_cancelled(self(),
32183218
stream_r(S, Connection),
3219-
SubId, Username, false)
3219+
SubId, Username)
32203220
|| #consumer{configuration =
32213221
#consumer_configuration{stream = S,
32223222
subscription_id = SubId}}
@@ -3298,8 +3298,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
32983298
stream_r(Stream,
32993299
C0),
33003300
SubId,
3301-
Username,
3302-
false),
3301+
Username),
33033302
maybe_unregister_consumer(
33043303
VirtualHost, Consumer,
33053304
single_active_consumer(Consumer),
@@ -3310,8 +3309,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33103309
stream_r(Stream,
33113310
C0),
33123311
SubId,
3313-
Username,
3314-
false),
3312+
Username),
33153313
maybe_unregister_consumer(
33163314
VirtualHost, Consumer,
33173315
single_active_consumer(Consumer),
@@ -3428,8 +3426,7 @@ remove_subscription(SubscriptionId,
34283426
virtual_host = VirtualHost,
34293427
outstanding_requests = Requests0,
34303428
stream_subscriptions = StreamSubscriptions} = Connection,
3431-
#stream_connection_state{consumers = Consumers} = State,
3432-
Notify) ->
3429+
#stream_connection_state{consumers = Consumers} = State) ->
34333430
#{SubscriptionId := Consumer} = Consumers,
34343431
#consumer{log = Log,
34353432
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
@@ -3456,8 +3453,7 @@ remove_subscription(SubscriptionId,
34563453
rabbit_stream_metrics:consumer_cancelled(self(),
34573454
stream_r(Stream, Connection2),
34583455
SubscriptionId,
3459-
Username,
3460-
Notify),
3456+
Username),
34613457

34623458
Requests1 = maybe_unregister_consumer(
34633459
VirtualHost, Consumer,

0 commit comments

Comments
 (0)