diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index b964fb1a1276..6d51223b381f 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1984,7 +1984,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason}, State -> {ok, State} catch throw:consuming_queue_down -> - {error, consuming_queue_down} + {error, consuming_queue_down} end; {eol, QStates1, QRef} -> {ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QRef, U0), @@ -1992,12 +1992,25 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason}, State = State0#state{queue_states = QStates, unacked_client_pubs = U}, send_puback(ConfirmPktIds, ?RC_SUCCESS, State), - {ok, State} + try handle_queue_down(QName, State) of + State -> + {ok, State} + catch throw:consuming_queue_down -> + {error, consuming_queue_down} + end end. -spec handle_queue_event( {queue_event, rabbit_amqqueue:name() | ?QUEUE_TYPE_QOS_0, term()}, state()) -> {ok, state()} | {error, Reason :: any(), state()}. +handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, {queue_down, QName}}, + State0) -> + try handle_queue_down(QName, State0) of + State -> + {ok, State} + catch throw:consuming_queue_down -> + {error, consuming_queue_down, State0} + end; handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg}, State0 = #state{qos0_messages_dropped = N}) -> State = case drop_qos0_message(State0) of @@ -2018,13 +2031,17 @@ handle_queue_event({queue_event, QName, Evt}, State = handle_queue_actions(Actions, State1), {ok, State}; {eol, Actions} -> - State1 = handle_queue_actions(Actions, State0), - {ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0), - QStates = rabbit_queue_type:remove(QName, QStates0), - State = State1#state{queue_states = QStates, - unacked_client_pubs = U}, - send_puback(ConfirmPktIds, ?RC_SUCCESS, State), - {ok, State}; + try + State1 = handle_queue_actions(Actions ++ [{queue_down, QName}], State0), + {ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0), + QStates = rabbit_queue_type:remove(QName, QStates0), + State = State1#state{queue_states = QStates, + unacked_client_pubs = U}, + send_puback(ConfirmPktIds, ?RC_SUCCESS, State), + {ok, State} + catch throw:consuming_queue_down -> + {error, consuming_queue_down, State0} + end; {protocol_error, _Type, _Reason, _ReasonArgs} = Error -> {error, Error, State0} end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl index 785a88a9aea3..55d5a2ca80f6 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl @@ -126,6 +126,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) -> log_delete(QName, amqqueue:get_exclusive_owner(Q)), case rabbit_amqqueue:internal_delete(Q, ActingUser) of ok -> + Pid = amqqueue:get_pid(Q), + delegate:invoke_no_result([Pid], {gen_server, cast, [{queue_event, ?MODULE, {queue_down, QName}}]}), {ok, 0}; {error, timeout} = Err -> Err diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 91632644874c..07ebabe6915f 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -131,6 +131,8 @@ handle_cast(QueueEvent = {queue_event, _, _}, try rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of {ok, PState} -> maybe_process_deferred_recv(control_throttle(pstate(State, PState))); + {error, consuming_queue_down = Reason, PState} -> + {stop, {shutdown, Reason}, pstate(State, PState)}; {error, Reason0, PState} -> {stop, Reason0, pstate(State, PState)} catch throw:{send_failed, Reason1} -> diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index be11044f7f4b..8bb037d5ef5f 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -24,6 +24,7 @@ -import(rabbit_ct_broker_helpers, [rabbitmqctl_list/3, + rabbitmqctl/3, rpc/4, rpc/5, rpc_all/4, @@ -125,6 +126,9 @@ cluster_size_1_tests() -> ,retained_message_conversion ,bind_exchange_to_exchange ,bind_exchange_to_exchange_single_message + ,notify_consumer_classic_queue_deleted + ,notify_consumer_quorum_queue_deleted + ,notify_consumer_qos0_queue_deleted ]. cluster_size_3_tests() -> @@ -167,8 +171,8 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(mqtt, Config) -> - rabbit_ct_helpers:set_config(Config, {websocket, false}); +init_per_group(mqtt, Config0) -> + rabbit_ct_helpers:set_config(Config0, {websocket, false}); init_per_group(Group, Config) when Group =:= v3; Group =:= v4; @@ -208,7 +212,8 @@ init_per_testcase(T, Config) init_per_testcase(T, Config) when T =:= clean_session_disconnect_client; T =:= clean_session_node_restart; - T =:= clean_session_node_kill -> + T =:= clean_session_node_kill; + T =:= notify_consumer_qos0_queue_deleted -> ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]), init_per_testcase0(T, Config); init_per_testcase(Testcase, Config) -> @@ -225,7 +230,8 @@ end_per_testcase(T, Config) end_per_testcase(T, Config) when T =:= clean_session_disconnect_client; T =:= clean_session_node_restart; - T =:= clean_session_node_kill -> + T =:= clean_session_node_kill; + T =:= notify_consumer_qos0_queue_deleted -> ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]), end_per_testcase0(T, Config); end_per_testcase(Testcase, Config) -> @@ -324,9 +330,7 @@ will_without_disconnect(Config) -> %% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties. %% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252 decode_basic_properties(Config) -> - App = rabbitmq_mqtt, - Par = durable_queue_type, - ok = rpc(Config, application, set_env, [App, Par, quorum]), + set_durable_queue_type(Config), ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId, Config, non_clean_sess_opts()), {ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1), @@ -340,7 +344,7 @@ decode_basic_properties(Config) -> ok = emqtt:disconnect(C1), C2 = connect(ClientId, Config, [{clean_start, true}]), ok = emqtt:disconnect(C2), - ok = rpc(Config, application, unset_env, [App, Par]), + unset_durable_queue_type(Config), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). quorum_queue_rejects(Config) -> @@ -1955,6 +1959,35 @@ bind_exchange_to_exchange_single_message(Config) -> ok = emqtt:disconnect(C), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). +notify_consumer_qos0_queue_deleted(Config) -> + Topic = atom_to_binary(?FUNCTION_NAME), + notify_consumer_queue_deleted(Config, Topic, <<"MQTT QoS 0">>, [{retry_interval, 1}], qos0). + +notify_consumer_classic_queue_deleted(Config) -> + Topic = atom_to_binary(?FUNCTION_NAME), + notify_consumer_queue_deleted(Config, Topic, <<"classic">>, non_clean_sess_opts(), qos0). + +notify_consumer_quorum_queue_deleted(Config) -> + set_durable_queue_type(Config), + Topic = atom_to_binary(?FUNCTION_NAME), + notify_consumer_queue_deleted(Config, Topic, <<"quorum">>, non_clean_sess_opts(), qos1), + unset_durable_queue_type(Config). + +notify_consumer_queue_deleted(Config, Name = Topic, ExpectedType, ConnOpts, Qos) -> + C = connect(Name, Config, ConnOpts), + {ok, _, _} = emqtt:subscribe(C, Topic, Qos), + {ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1), + {ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1), + ok = expect_publishes(C, Topic, [<<"m1">>, <<"m2">>]), + + [[QName, Type]] = rabbitmqctl_list(Config, 0, ["list_queues", "name", "type", "--no-table-headers"]), + ?assertMatch(ExpectedType, Type), + + process_flag(trap_exit, true), + {ok, _} = rabbitmqctl(Config, 0, ["delete_queue", QName]), + + await_exit(C). + %% ------------------------------------------------------------------- %% Internal helpers %% ------------------------------------------------------------------- @@ -1985,7 +2018,7 @@ await_confirms_unordered(From, Left) -> end. await_consumer_count(ConsumerCount, ClientId, QoS, Config) -> - {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), QueueName = rabbit_mqtt_util:queue_name_bin( rabbit_data_coercion:to_binary(ClientId), QoS), eventually( @@ -2030,3 +2063,9 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) -> after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server") end end. + +set_durable_queue_type(Config) -> + ok = rpc(Config, application, set_env, [rabbitmq_mqtt, durable_queue_type, quorum]). + +unset_durable_queue_type(Config) -> + ok = rpc(Config, application, unset_env, [rabbitmq_mqtt, durable_queue_type]). diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index bbe37b56a9c7..e2b3f006725e 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -100,3 +100,6 @@ duplicate_client_id(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). publish_to_all_queue_types_qos0(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). publish_to_all_queue_types_qos1(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).