diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 23c7e2d0725e..a34ed96c3345 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -972,9 +972,15 @@ init(Q) when ?is_amqqueue(Q) -> E end. -close(#stream_client{readers = Readers}) -> - maps:foreach(fun (_, #stream{log = Log}) -> - osiris_log:close(Log) +close(#stream_client{readers = Readers, + name = QName}) -> + maps:foreach(fun (CTag, #stream{log = Log}) -> + close_log(Log), + rabbit_core_metrics:consumer_deleted(self(), CTag, QName), + rabbit_event:notify(consumer_deleted, + [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]) end, Readers). update(Q, State) diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index d45ed0fb71ad..3815f5df6bac 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -143,6 +143,7 @@ all_tests_3() -> consume_credit_out_of_order_ack, consume_credit_multiple_ack, basic_cancel, + consumer_metrics_cleaned_on_connection_close, receive_basic_cancel_on_queue_deletion, keep_consuming_on_leader_restart, max_length_bytes, @@ -1184,6 +1185,33 @@ basic_cancel(Config) -> end, rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +consumer_metrics_cleaned_on_connection_close(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Q = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + Conn = rabbit_ct_client_helpers:open_connection(Config, Server), + {ok, Ch} = amqp_connection:open_channel(Conn), + qos(Ch, 10, false), + CTag = <<"consumer_metrics_cleaned_on_connection_close">>, + subscribe(Ch, Q, false, 0, CTag), + rabbit_ct_helpers:await_condition( + fun() -> + 1 == length(filter_consumers(Config, Server, CTag)) + end, 30000), + + ok = rabbit_ct_client_helpers:close_connection(Conn), + + rabbit_ct_helpers:await_condition( + fun() -> + 0 == length(filter_consumers(Config, Server, CTag)) + end, 30000), + + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + + receive_basic_cancel_on_queue_deletion(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),