Skip to content

Commit c5edd60

Browse files
Merge pull request #13105 from rabbitmq/stream-consume-cancel-emit-events
Emit internal events on stream consume and cancel
2 parents 5078a74 + 31a4d61 commit c5edd60

File tree

8 files changed

+300
-58
lines changed

8 files changed

+300
-58
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,8 @@ consume(Q, Spec, #stream_client{} = QState0)
313313
consumer_tag := ConsumerTag,
314314
exclusive_consume := ExclusiveConsume,
315315
args := Args,
316-
ok_msg := OkMsg} = Spec,
316+
ok_msg := OkMsg,
317+
acting_user := ActingUser} = Spec,
317318
QName = amqqueue:get_name(Q),
318319
rabbit_log:debug("~s:~s Local pid resolved ~0p",
319320
[?MODULE, ?FUNCTION_NAME, LocalPid]),
@@ -330,6 +331,15 @@ consume(Q, Spec, #stream_client{} = QState0)
330331
rabbit_core_metrics:consumer_created(
331332
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
332333
QName, ConsumerPrefetchCount, true, up, Args),
334+
rabbit_event:notify(consumer_created,
335+
[{consumer_tag, ConsumerTag},
336+
{exclusive, ExclusiveConsume},
337+
{ack_required, AckRequired},
338+
{channel, ChPid},
339+
{queue, QName},
340+
{prefetch_count, ConsumerPrefetchCount},
341+
{arguments, Args},
342+
{user_who_performed_action, ActingUser}]),
333343
%% reply needs to be sent before the stream
334344
%% begins sending
335345
maybe_send_reply(ChPid, OkMsg),
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
%% The contents of this file are subject to the Mozilla Public License
2+
%% Version 2.0 (the "License"); you may not use this file except in
3+
%% compliance with the License. You may obtain a copy of the License
4+
%% at https://www.mozilla.org/en-US/MPL/2.0/
5+
%%
6+
%% Software distributed under the License is distributed on an "AS IS"
7+
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
%% the License for the specific language governing rights and
9+
%% limitations under the License.
10+
%%
11+
%% The Original Code is RabbitMQ.
12+
%%
13+
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
14+
%% Copyright (c) 2025 Broadcom. All Rights Reserved.
15+
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
16+
%%
17+
18+
-module(rabbit_list_test_event_handler).
19+
20+
-behaviour(gen_event).
21+
22+
-export([start_link/0, stop/0, get_events/0, clear_events/0]).
23+
24+
%% callbacks
25+
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
26+
27+
start_link() ->
28+
gen_event:start_link({local, ?MODULE}).
29+
30+
stop() ->
31+
gen_event:stop(?MODULE).
32+
33+
get_events() ->
34+
gen_event:call(?MODULE, ?MODULE, get_events).
35+
36+
clear_events() ->
37+
gen_event:call(?MODULE, ?MODULE, clear_events).
38+
39+
%% Callbacks
40+
41+
init([]) ->
42+
{ok, []}.
43+
44+
handle_event(Event, State) ->
45+
{ok, [Event | State]}.
46+
47+
handle_call(get_events, State) ->
48+
{ok, lists:reverse(State), State};
49+
handle_call(clear_events, _) ->
50+
{ok, ok, []}.
51+
52+
handle_info(_Info, State) ->
53+
{ok, State}.
54+
55+
terminate(_Reason, _State) ->
56+
ok.
57+
58+
code_change(_OldVsn, State, _Extra) ->
59+
{ok, State}.

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ all_tests_3() ->
144144
consume_credit_multiple_ack,
145145
basic_cancel,
146146
consumer_metrics_cleaned_on_connection_close,
147+
consume_cancel_should_create_events,
147148
receive_basic_cancel_on_queue_deletion,
148149
keep_consuming_on_leader_restart,
149150
max_length_bytes,
@@ -1195,7 +1196,7 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
11951196
Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
11961197
{ok, Ch} = amqp_connection:open_channel(Conn),
11971198
qos(Ch, 10, false),
1198-
CTag = <<"consumer_metrics_cleaned_on_connection_close">>,
1199+
CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
11991200
subscribe(Ch, Q, false, 0, CTag),
12001201
rabbit_ct_helpers:await_condition(
12011202
fun() ->
@@ -1211,6 +1212,49 @@ consumer_metrics_cleaned_on_connection_close(Config) ->
12111212

12121213
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
12131214

1215+
consume_cancel_should_create_events(Config) ->
1216+
HandlerMod = rabbit_list_test_event_handler,
1217+
rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, HandlerMod),
1218+
rabbit_ct_broker_helpers:rpc(Config, 0,
1219+
gen_event,
1220+
add_handler,
1221+
[rabbit_event, HandlerMod, []]),
1222+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1223+
1224+
Q = ?config(queue_name, Config),
1225+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1226+
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1227+
1228+
Conn = rabbit_ct_client_helpers:open_connection(Config, Server),
1229+
{ok, Ch} = amqp_connection:open_channel(Conn),
1230+
qos(Ch, 10, false),
1231+
1232+
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
1233+
gen_event,
1234+
call,
1235+
[rabbit_event, HandlerMod, clear_events]),
1236+
1237+
CTag = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
1238+
1239+
?assertEqual([], filtered_events(Config, consumer_created, CTag)),
1240+
?assertEqual([], filtered_events(Config, consumer_deleted, CTag)),
1241+
1242+
subscribe(Ch, Q, false, 0, CTag),
1243+
1244+
?awaitMatch([{event, consumer_created, _, _, _}], filtered_events(Config, consumer_created, CTag), ?WAIT),
1245+
?assertEqual([], filtered_events(Config, consumer_deleted, CTag)),
1246+
1247+
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
1248+
1249+
?awaitMatch([{event, consumer_deleted, _, _, _}], filtered_events(Config, consumer_deleted, CTag), ?WAIT),
1250+
1251+
rabbit_ct_broker_helpers:rpc(Config, 0,
1252+
gen_event,
1253+
delete_handler,
1254+
[rabbit_event, HandlerMod, []]),
1255+
1256+
ok = rabbit_ct_client_helpers:close_connection(Conn),
1257+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
12141258

12151259
receive_basic_cancel_on_queue_deletion(Config) ->
12161260
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1395,6 +1439,18 @@ filter_consumers(Config, Server, CTag) ->
13951439
end
13961440
end, [], CInfo).
13971441

1442+
1443+
filtered_events(Config, EventType, CTag) ->
1444+
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
1445+
gen_event,
1446+
call,
1447+
[rabbit_event, rabbit_list_test_event_handler, get_events]),
1448+
lists:filter(fun({event, Type, Fields, _, _}) when Type =:= EventType ->
1449+
proplists:get_value(consumer_tag, Fields) =:= CTag;
1450+
(_) ->
1451+
false
1452+
end, Events).
1453+
13981454
consume_and_reject(Config) ->
13991455
consume_and_(Config, fun (DT) -> #'basic.reject'{delivery_tag = DT} end).
14001456
consume_and_nack(Config) ->

deps/rabbitmq_stream/src/rabbit_stream_metrics.erl

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
%% API
2222
-export([init/0]).
23-
-export([consumer_created/9,
23+
-export([consumer_created/10,
2424
consumer_updated/9,
25-
consumer_cancelled/4]).
25+
consumer_cancelled/5]).
2626
-export([publisher_created/4,
2727
publisher_updated/7,
2828
publisher_deleted/3]).
@@ -42,7 +42,8 @@ consumer_created(Connection,
4242
Offset,
4343
OffsetLag,
4444
Active,
45-
Properties) ->
45+
Properties,
46+
ActingUser) ->
4647
Values =
4748
[{credits, Credits},
4849
{consumed, MessageCount},
@@ -55,16 +56,32 @@ consumer_created(Connection,
5556
ets:insert(?TABLE_CONSUMER,
5657
{{StreamResource, Connection, SubscriptionId}, Values}),
5758
rabbit_global_counters:consumer_created(stream),
58-
rabbit_core_metrics:consumer_created(Connection,
59-
consumer_tag(SubscriptionId),
60-
false,
61-
false,
59+
CTag = consumer_tag(SubscriptionId),
60+
ExclusiveConsume = false,
61+
AckRequired = false,
62+
Pid = Connection,
63+
PrefetchCount = 0,
64+
Args = rabbit_misc:to_amqp_table(Properties),
65+
rabbit_core_metrics:consumer_created(Pid,
66+
CTag,
67+
ExclusiveConsume,
68+
AckRequired,
6269
StreamResource,
63-
0,
70+
PrefetchCount,
6471
Active,
6572
rabbit_stream_utils:consumer_activity_status(Active,
6673
Properties),
67-
rabbit_misc:to_amqp_table(Properties)),
74+
Args),
75+
76+
rabbit_event:notify(consumer_created,
77+
[{consumer_tag, CTag},
78+
{exclusive, ExclusiveConsume},
79+
{ack_required, AckRequired},
80+
{channel, Pid},
81+
{queue, StreamResource},
82+
{prefetch_count, PrefetchCount},
83+
{arguments, Args},
84+
{user_who_performed_action, ActingUser}]),
6885
ok.
6986

7087
consumer_tag(SubscriptionId) ->
@@ -104,7 +121,7 @@ consumer_updated(Connection,
104121

105122
ok.
106123

107-
consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
124+
consumer_cancelled(Connection, StreamResource, SubscriptionId, ActingUser, Notify) ->
108125
ets:delete(?TABLE_CONSUMER,
109126
{StreamResource, Connection, SubscriptionId}),
110127
rabbit_global_counters:consumer_deleted(stream),
@@ -115,7 +132,8 @@ consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
115132
true ->
116133
rabbit_event:notify(consumer_deleted,
117134
[{consumer_tag, consumer_tag(SubscriptionId)},
118-
{channel, self()}, {queue, StreamResource}]);
135+
{channel, self()}, {queue, StreamResource},
136+
{user_who_performed_action, ActingUser}]);
119137
_ -> ok
120138
end,
121139
ok.

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2924,9 +2924,8 @@ consumer_name(_Properties) ->
29242924
maybe_dispatch_on_subscription(Transport,
29252925
State,
29262926
ConsumerState,
2927-
#stream_connection{deliver_version =
2928-
DeliverVersion} =
2929-
Connection,
2927+
#stream_connection{deliver_version = DeliverVersion,
2928+
user = #user{username = Username}} = Connection,
29302929
Consumers,
29312930
Stream,
29322931
SubscriptionId,
@@ -2970,13 +2969,14 @@ maybe_dispatch_on_subscription(Transport,
29702969
ConsumerOffset,
29712970
ConsumerOffsetLag,
29722971
true,
2973-
SubscriptionProperties),
2972+
SubscriptionProperties,
2973+
Username),
29742974
State#stream_connection_state{consumers = Consumers1}
29752975
end;
29762976
maybe_dispatch_on_subscription(_Transport,
29772977
State,
29782978
ConsumerState,
2979-
Connection,
2979+
#stream_connection{user = #user{username = Username}} = Connection,
29802980
Consumers,
29812981
Stream,
29822982
SubscriptionId,
@@ -3000,7 +3000,8 @@ maybe_dispatch_on_subscription(_Transport,
30003000
Offset,
30013001
0, %% offset lag
30023002
Active,
3003-
SubscriptionProperties),
3003+
SubscriptionProperties,
3004+
Username),
30043005
Consumers1 = Consumers#{SubscriptionId => ConsumerState},
30053006
State#stream_connection_state{consumers = Consumers1}.
30063007

@@ -3205,19 +3206,15 @@ partition_index(VirtualHost, Stream, Properties) ->
32053206
-1
32063207
end.
32073208

3208-
notify_connection_closed(#statem_data{connection =
3209-
#stream_connection{name = Name,
3210-
publishers =
3211-
Publishers} =
3212-
Connection,
3213-
connection_state =
3214-
#stream_connection_state{consumers =
3215-
Consumers} =
3216-
ConnectionState}) ->
3209+
notify_connection_closed(#statem_data{
3210+
connection = #stream_connection{name = Name,
3211+
user = #user{username = Username},
3212+
publishers = Publishers} = Connection,
3213+
connection_state = #stream_connection_state{consumers = Consumers} = ConnectionState}) ->
32173214
rabbit_core_metrics:connection_closed(self()),
32183215
[rabbit_stream_metrics:consumer_cancelled(self(),
32193216
stream_r(S, Connection),
3220-
SubId, false)
3217+
SubId, Username, false)
32213218
|| #consumer{configuration =
32223219
#consumer_configuration{stream = S,
32233220
subscription_id = SubId}}
@@ -3275,24 +3272,15 @@ clean_state_after_super_stream_deletion(Partitions, Connection, State, Transport
32753272
end, {Connection, State}, Partitions).
32763273

32773274
clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
3278-
#stream_connection{virtual_host =
3279-
VirtualHost,
3280-
stream_subscriptions
3281-
=
3282-
StreamSubscriptions,
3283-
publishers =
3284-
Publishers,
3285-
publisher_to_ids
3286-
=
3287-
PublisherToIds,
3288-
stream_leaders =
3289-
Leaders,
3290-
outstanding_requests = Requests0} =
3291-
C0,
3292-
#stream_connection_state{consumers
3293-
=
3294-
Consumers} =
3295-
S0) ->
3275+
#stream_connection{
3276+
user = #user{username = Username},
3277+
virtual_host = VirtualHost,
3278+
stream_subscriptions = StreamSubscriptions,
3279+
publishers = Publishers,
3280+
publisher_to_ids = PublisherToIds,
3281+
stream_leaders = Leaders,
3282+
outstanding_requests = Requests0} = C0,
3283+
#stream_connection_state{consumers = Consumers} = S0) ->
32963284
{SubscriptionsCleaned, C1, S1} =
32973285
case stream_has_subscriptions(Stream, C0) of
32983286
true ->
@@ -3306,6 +3294,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33063294
stream_r(Stream,
33073295
C0),
33083296
SubId,
3297+
Username,
33093298
false),
33103299
maybe_unregister_consumer(
33113300
VirtualHost, Consumer,
@@ -3317,6 +3306,7 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
33173306
stream_r(Stream,
33183307
C0),
33193308
SubId,
3309+
Username,
33203310
false),
33213311
maybe_unregister_consumer(
33223312
VirtualHost, Consumer,
@@ -3429,11 +3419,11 @@ lookup_leader_from_manager(VirtualHost, Stream) ->
34293419
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
34303420

34313421
remove_subscription(SubscriptionId,
3432-
#stream_connection{virtual_host = VirtualHost,
3433-
outstanding_requests = Requests0,
3434-
stream_subscriptions =
3435-
StreamSubscriptions} =
3436-
Connection,
3422+
#stream_connection{
3423+
user = #user{username = Username},
3424+
virtual_host = VirtualHost,
3425+
outstanding_requests = Requests0,
3426+
stream_subscriptions = StreamSubscriptions} = Connection,
34373427
#stream_connection_state{consumers = Consumers} = State,
34383428
Notify) ->
34393429
#{SubscriptionId := Consumer} = Consumers,
@@ -3462,6 +3452,7 @@ remove_subscription(SubscriptionId,
34623452
rabbit_stream_metrics:consumer_cancelled(self(),
34633453
stream_r(Stream, Connection2),
34643454
SubscriptionId,
3455+
Username,
34653456
Notify),
34663457

34673458
Requests1 = maybe_unregister_consumer(

0 commit comments

Comments
 (0)