From 8ba2b4134ee97efbe0535a332a8c0498af9ab0b1 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Sun, 1 Jun 2025 16:43:39 +0200 Subject: [PATCH 1/6] MQTT: disconnect consumer when queue is deleted Queues are automatically declared for MQTT consumers, but they can be externally deleted. The consumer should be disconnected in such case, because it has no way of knowing this happened - from its perspective there are simply no messages to consume. In RabbitMQ 3.11 the consumer was disconnected in such situation. This behaviour changed with native MQTT, which doesn't use AMQP internally. (cherry picked from commit bf468bdd5215cd6fe3337398734c4b704ce627b1) # Conflicts: # deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl --- .../src/rabbit_mqtt_processor.erl | 35 ++++++--- .../src/rabbit_mqtt_qos0_queue.erl | 2 + deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 2 + deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 76 +++++++++++++++++-- 4 files changed, 101 insertions(+), 14 deletions(-) diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index ad8d34085364..6888fcd66b6e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1976,7 +1976,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason}, State -> {ok, State} catch throw:consuming_queue_down -> - {error, consuming_queue_down} + {error, consuming_queue_down} end; {eol, QStates1, QRef} -> {ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QRef, U0), @@ -1984,12 +1984,25 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason}, State = State0#state{queue_states = QStates, unacked_client_pubs = U}, send_puback(ConfirmPktIds, ?RC_SUCCESS, State), - {ok, State} + try handle_queue_down(QName, State) of + State -> + {ok, State} + catch throw:consuming_queue_down -> + {error, consuming_queue_down} + end end. -spec handle_queue_event( {queue_event, rabbit_amqqueue:name() | ?QUEUE_TYPE_QOS_0, term()}, state()) -> {ok, state()} | {error, Reason :: any(), state()}. +handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, {queue_down, QName}}, + State0) -> + try handle_queue_down(QName, State0) of + State -> + {ok, State} + catch throw:consuming_queue_down -> + {error, consuming_queue_down, State0} + end; handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg}, State0 = #state{qos0_messages_dropped = N}) -> State = case drop_qos0_message(State0) of @@ -2010,13 +2023,17 @@ handle_queue_event({queue_event, QName, Evt}, State = handle_queue_actions(Actions, State1), {ok, State}; {eol, Actions} -> - State1 = handle_queue_actions(Actions, State0), - {ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0), - QStates = rabbit_queue_type:remove(QName, QStates0), - State = State1#state{queue_states = QStates, - unacked_client_pubs = U}, - send_puback(ConfirmPktIds, ?RC_SUCCESS, State), - {ok, State}; + try + State1 = handle_queue_actions(Actions ++ [{queue_down, QName}], State0), + {ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0), + QStates = rabbit_queue_type:remove(QName, QStates0), + State = State1#state{queue_states = QStates, + unacked_client_pubs = U}, + send_puback(ConfirmPktIds, ?RC_SUCCESS, State), + {ok, State} + catch throw:consuming_queue_down -> + {error, consuming_queue_down, State0} + end; {protocol_error, _Type, _Reason, _ReasonArgs} = Error -> {error, Error, State0} end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index d0201e7a7d9f..0fb6d63e112c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -116,6 +116,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) -> log_delete(QName, amqqueue:get_exclusive_owner(Q)), case rabbit_amqqueue:internal_delete(Q, ActingUser) of ok -> + Pid = amqqueue:get_pid(Q), + delegate:invoke_no_result([Pid], {gen_server, cast, [{queue_event, ?MODULE, {queue_down, QName}}]}), {ok, 0}; {error, timeout} = Err -> Err diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 91632644874c..07ebabe6915f 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -131,6 +131,8 @@ handle_cast(QueueEvent = {queue_event, _, _}, try rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of {ok, PState} -> maybe_process_deferred_recv(control_throttle(pstate(State, PState))); + {error, consuming_queue_down = Reason, PState} -> + {stop, {shutdown, Reason}, pstate(State, PState)}; {error, Reason0, PState} -> {stop, Reason0, pstate(State, PState)} catch throw:{send_failed, Reason1} -> diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 7d10cf13a580..021c8a49b972 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -24,6 +24,7 @@ -import(rabbit_ct_broker_helpers, [rabbitmqctl_list/3, + rabbitmqctl/3, rpc/4, rpc/5, rpc_all/4, @@ -125,6 +126,9 @@ cluster_size_1_tests() -> ,retained_message_conversion ,bind_exchange_to_exchange ,bind_exchange_to_exchange_single_message + ,notify_consumer_classic_queue_deleted + ,notify_consumer_quorum_queue_deleted + ,notify_consumer_qos0_queue_deleted ]. cluster_size_3_tests() -> @@ -167,8 +171,8 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(mqtt, Config) -> - rabbit_ct_helpers:set_config(Config, {websocket, false}); +init_per_group(mqtt, Config0) -> + rabbit_ct_helpers:set_config(Config0, {websocket, false}); init_per_group(Group, Config) when Group =:= v3; Group =:= v4; @@ -205,6 +209,16 @@ init_per_testcase(T, Config) T =:= management_plugin_enable -> inets:start(), init_per_testcase0(T, Config); +<<<<<<< HEAD +======= +init_per_testcase(T, Config) + when T =:= clean_session_disconnect_client; + T =:= clean_session_node_restart; + T =:= clean_session_node_kill; + T =:= notify_consumer_qos0_queue_deleted -> + ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]), + init_per_testcase0(T, Config); +>>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -216,6 +230,16 @@ end_per_testcase(T, Config) T =:= management_plugin_enable -> ok = inets:stop(), end_per_testcase0(T, Config); +<<<<<<< HEAD +======= +end_per_testcase(T, Config) + when T =:= clean_session_disconnect_client; + T =:= clean_session_node_restart; + T =:= clean_session_node_kill; + T =:= notify_consumer_qos0_queue_deleted -> + ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]), + end_per_testcase0(T, Config); +>>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) end_per_testcase(Testcase, Config) -> end_per_testcase0(Testcase, Config). @@ -307,9 +331,7 @@ will_without_disconnect(Config) -> %% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties. %% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252 decode_basic_properties(Config) -> - App = rabbitmq_mqtt, - Par = durable_queue_type, - ok = rpc(Config, application, set_env, [App, Par, quorum]), + set_durable_queue_type(Config), ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId, Config, non_clean_sess_opts()), {ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1), @@ -323,7 +345,12 @@ decode_basic_properties(Config) -> ok = emqtt:disconnect(C1), C2 = connect(ClientId, Config, [{clean_start, true}]), ok = emqtt:disconnect(C2), +<<<<<<< HEAD ok = rpc(Config, application, unset_env, [App, Par]). +======= + unset_durable_queue_type(Config), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). +>>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) quorum_queue_rejects(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), @@ -1906,6 +1933,35 @@ bind_exchange_to_exchange_single_message(Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q})), ok = emqtt:disconnect(C). +notify_consumer_qos0_queue_deleted(Config) -> + Topic = atom_to_binary(?FUNCTION_NAME), + notify_consumer_queue_deleted(Config, Topic, <<"MQTT QoS 0">>, [{retry_interval, 1}], qos0). + +notify_consumer_classic_queue_deleted(Config) -> + Topic = atom_to_binary(?FUNCTION_NAME), + notify_consumer_queue_deleted(Config, Topic, <<"classic">>, non_clean_sess_opts(), qos0). + +notify_consumer_quorum_queue_deleted(Config) -> + set_durable_queue_type(Config), + Topic = atom_to_binary(?FUNCTION_NAME), + notify_consumer_queue_deleted(Config, Topic, <<"quorum">>, non_clean_sess_opts(), qos1), + unset_durable_queue_type(Config). + +notify_consumer_queue_deleted(Config, Name = Topic, ExpectedType, ConnOpts, Qos) -> + C = connect(Name, Config, ConnOpts), + {ok, _, _} = emqtt:subscribe(C, Topic, Qos), + {ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1), + {ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1), + ok = expect_publishes(C, Topic, [<<"m1">>, <<"m2">>]), + + [[QName, Type]] = rabbitmqctl_list(Config, 0, ["list_queues", "name", "type", "--no-table-headers"]), + ?assertMatch(ExpectedType, Type), + + process_flag(trap_exit, true), + {ok, _} = rabbitmqctl(Config, 0, ["delete_queue", QName]), + + await_exit(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- @@ -1936,7 +1992,11 @@ await_confirms_unordered(From, Left) -> end. await_consumer_count(ConsumerCount, ClientId, QoS, Config) -> +<<<<<<< HEAD Ch = rabbit_ct_client_helpers:open_channel(Config), +======= + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), +>>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) QueueName = rabbit_mqtt_util:queue_name_bin( rabbit_data_coercion:to_binary(ClientId), QoS), eventually( @@ -1981,3 +2041,9 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) -> after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server") end end. + +set_durable_queue_type(Config) -> + ok = rpc(Config, application, set_env, [rabbitmq_mqtt, durable_queue_type, quorum]). + +unset_durable_queue_type(Config) -> + ok = rpc(Config, application, unset_env, [rabbitmq_mqtt, durable_queue_type]). From 30854bb14705bbf196d42904702ae7656826c70f Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 1 Jun 2025 19:29:28 +0400 Subject: [PATCH 2/6] web_mqtt: propagate notify_consumer_classic_queue_deleted to mqtt_shared_SUITE (cherry picked from commit 9eaa22066b729ba3181e411771e62e2230ecd84c) --- deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index 693345dc4cec..ceb2bbab3c35 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -100,3 +100,4 @@ duplicate_client_id(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). publish_to_all_queue_types_qos0(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). publish_to_all_queue_types_qos1(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). \ No newline at end of file From 7490a99c8257b129af427a6b9937842cc5e6c77a Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Sun, 1 Jun 2025 17:37:57 +0200 Subject: [PATCH 3/6] web_mqtt: propagate notify_consumer_quorum/qos0_queue_deleted to mqtt_shared_SUITE (cherry picked from commit d91c9d61d45b34f48d241e0422f988efbd208c01) --- deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index ceb2bbab3c35..8083d481578f 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -100,4 +100,6 @@ duplicate_client_id(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). publish_to_all_queue_types_qos0(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). publish_to_all_queue_types_qos1(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). -notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). \ No newline at end of file +notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). From c1e1023139d827175e742d0e08b6be2053736528 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 2 Jun 2025 16:20:57 +0200 Subject: [PATCH 4/6] REVERT try ubuntu 22.04 (cherry picked from commit 610c83867efb5002f773ad52c74d061b4b7ac470) --- .github/workflows/test-make-target.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-make-target.yaml b/.github/workflows/test-make-target.yaml index 15843138c946..9932438449ff 100644 --- a/.github/workflows/test-make-target.yaml +++ b/.github/workflows/test-make-target.yaml @@ -24,7 +24,7 @@ on: jobs: test: name: ${{ inputs.plugin }} (${{ inputs.make_target }}) - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 timeout-minutes: 60 steps: - name: CHECKOUT REPOSITORY From 22f8657eb054c67919a8769f387c03260c4f8c14 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 3 Jun 2025 00:39:47 +0400 Subject: [PATCH 5/6] Revert "REVERT try ubuntu 22.04" This reverts commit 5a0260440539a7e350d410f8f046164d582cd7f0. (cherry picked from commit b48ab7246d21d9e964a3309dc500fe98cb05c48c) --- .github/workflows/test-make-target.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-make-target.yaml b/.github/workflows/test-make-target.yaml index 9932438449ff..15843138c946 100644 --- a/.github/workflows/test-make-target.yaml +++ b/.github/workflows/test-make-target.yaml @@ -24,7 +24,7 @@ on: jobs: test: name: ${{ inputs.plugin }} (${{ inputs.make_target }}) - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest timeout-minutes: 60 steps: - name: CHECKOUT REPOSITORY From 088452fe970dd37fced622ca3aa8b8b31c59ec92 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 3 Jun 2025 09:06:01 +0400 Subject: [PATCH 6/6] Resolve a conflict #13996 #14013 --- deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 021c8a49b972..1db38072c43c 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -209,8 +209,6 @@ init_per_testcase(T, Config) T =:= management_plugin_enable -> inets:start(), init_per_testcase0(T, Config); -<<<<<<< HEAD -======= init_per_testcase(T, Config) when T =:= clean_session_disconnect_client; T =:= clean_session_node_restart; @@ -218,7 +216,6 @@ init_per_testcase(T, Config) T =:= notify_consumer_qos0_queue_deleted -> ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]), init_per_testcase0(T, Config); ->>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) init_per_testcase(Testcase, Config) -> init_per_testcase0(Testcase, Config). @@ -230,8 +227,6 @@ end_per_testcase(T, Config) T =:= management_plugin_enable -> ok = inets:stop(), end_per_testcase0(T, Config); -<<<<<<< HEAD -======= end_per_testcase(T, Config) when T =:= clean_session_disconnect_client; T =:= clean_session_node_restart; @@ -239,7 +234,6 @@ end_per_testcase(T, Config) T =:= notify_consumer_qos0_queue_deleted -> ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]), end_per_testcase0(T, Config); ->>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) end_per_testcase(Testcase, Config) -> end_per_testcase0(Testcase, Config). @@ -337,7 +331,7 @@ decode_basic_properties(Config) -> {ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1), QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]), ?assertEqual(1, length(QuorumQueues)), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>, routing_key = Topic}, #amqp_msg{payload = Payload}), @@ -345,12 +339,8 @@ decode_basic_properties(Config) -> ok = emqtt:disconnect(C1), C2 = connect(ClientId, Config, [{clean_start, true}]), ok = emqtt:disconnect(C2), -<<<<<<< HEAD - ok = rpc(Config, application, unset_env, [App, Par]). -======= unset_durable_queue_type(Config), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). ->>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) quorum_queue_rejects(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), @@ -1992,11 +1982,7 @@ await_confirms_unordered(From, Left) -> end. await_consumer_count(ConsumerCount, ClientId, QoS, Config) -> -<<<<<<< HEAD - Ch = rabbit_ct_client_helpers:open_channel(Config), -======= {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), ->>>>>>> bf468bdd5 (MQTT: disconnect consumer when queue is deleted) QueueName = rabbit_mqtt_util:queue_name_bin( rabbit_data_coercion:to_binary(ClientId), QoS), eventually(