Skip to content

Commit 22f0d99

Browse files
committed
Clean up management stats for stream consumer
1 parent 924b451 commit 22f0d99

File tree

1 file changed

+27
-7
lines changed

1 file changed

+27
-7
lines changed

deps/rabbitmq_stream/src/rabbit_stream_metrics.erl

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,35 @@ consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notif
128128
rabbit_core_metrics:consumer_deleted(Connection,
129129
consumer_tag(SubscriptionId),
130130
StreamResource),
131+
132+
propagate_consumer_cancellation(StreamResource, SubscriptionId, ActingUser,
133+
Notify),
134+
ok.
135+
136+
propagate_consumer_cancellation(StreamResource, SubscriptionId, ActingUser,
137+
Notify) ->
138+
Props = consumer_cancellation_event(StreamResource,
139+
SubscriptionId,
140+
ActingUser),
141+
Type = consumer_deleted,
131142
case Notify of
132143
true ->
133-
rabbit_event:notify(consumer_deleted,
134-
[{consumer_tag, consumer_tag(SubscriptionId)},
135-
{channel, self()}, {queue, StreamResource},
136-
{user_who_performed_action, ActingUser}]);
137-
_ -> ok
138-
end,
139-
ok.
144+
rabbit_event:notify(Type, Props);
145+
false ->
146+
%% creating record without referencing it
147+
Evt = {event, Type, Props, none, os:system_time(millisecond)},
148+
try
149+
gen_server:cast(rabbit_mgmt_metrics_gc:name(Type), {event, Evt})
150+
catch
151+
_:_ ->
152+
ok
153+
end
154+
end.
155+
156+
consumer_cancellation_event(StreamResource, SubscriptionId, ActingUser) ->
157+
[{consumer_tag, consumer_tag(SubscriptionId)},
158+
{channel, self()}, {queue, StreamResource},
159+
{user_who_performed_action, ActingUser}].
140160

141161
publisher_created(Connection,
142162
StreamResource,

0 commit comments

Comments
 (0)