Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1984,20 +1984,33 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
State ->
{ok, State}
catch throw:consuming_queue_down ->
{error, consuming_queue_down}
{error, consuming_queue_down}
end;
{eol, QStates1, QRef} ->
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QRef, U0),
QStates = rabbit_queue_type:remove(QRef, QStates1),
State = State0#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State}
try handle_queue_down(QName, State) of
State ->
{ok, State}
catch throw:consuming_queue_down ->
{error, consuming_queue_down}
end
end.

-spec handle_queue_event(
{queue_event, rabbit_amqqueue:name() | ?QUEUE_TYPE_QOS_0, term()}, state()) ->
{ok, state()} | {error, Reason :: any(), state()}.
handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, {queue_down, QName}},
State0) ->
try handle_queue_down(QName, State0) of
State ->
{ok, State}
catch throw:consuming_queue_down ->
{error, consuming_queue_down, State0}
end;
handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg},
State0 = #state{qos0_messages_dropped = N}) ->
State = case drop_qos0_message(State0) of
Expand All @@ -2018,13 +2031,17 @@ handle_queue_event({queue_event, QName, Evt},
State = handle_queue_actions(Actions, State1),
{ok, State};
{eol, Actions} ->
State1 = handle_queue_actions(Actions, State0),
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
QStates = rabbit_queue_type:remove(QName, QStates0),
State = State1#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State};
try
State1 = handle_queue_actions(Actions ++ [{queue_down, QName}], State0),
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
QStates = rabbit_queue_type:remove(QName, QStates0),
State = State1#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State}
Comment on lines +2036 to +2041
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this code executed, given that the additional queue_down action will throw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It is executed for non consuming queues, but skipped for consuming ones. I guess the handing of queue_down should be done after sending the ack?

catch throw:consuming_queue_down ->
{error, consuming_queue_down, State0}
end;
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
{error, Error, State0}
end.
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
ok ->
Pid = amqqueue:get_pid(Q),
delegate:invoke_no_result([Pid], {gen_server, cast, [{queue_event, ?MODULE, {queue_down, QName}}]}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is delegate being used here? To me this seems an overkill. Just calling gen_server:cast directly is good enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we emit an eol "end of life" instead of queue_down event?
This way, we use a consistent naming with classic queues and quorum queues, and streams: They all emit an eol event if the queue is known to be deleted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delegate is there because it is replicating 0.9.1 behaviour. In this case, it ended up having just one pid to notify and I didn't realised it is unnecessary now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, in 0.9.1 the channel emits a queue_down event in this case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delegate is there because it is replicating 0.9.1 behaviour

Yes, for AMQP 0.9.1 all protocol methods have to go through delegate such that the causal order of protocol methods from the client's point of view is guaranteed to stay the same (including the AMQP 0.9.1 queue.delete method).

Copy link
Member

@ansd ansd Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, in 0.9.1 the channel emits a queue_down event in this case

Where exactly is this happening? The fact that the channel handles 'DOWN' monitor messages is a relict from prior to the rabbit_queue_type API.
The more general direction after the rabbit_queue_type API got introduced is that an eol queue event/action denotes a queue deletion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line you are referencing is not executed when a classic queue got deleted. Instead the classic queue emits an eol queue action in

%% queue was deleted
{eol, []};

{ok, 0};
{error, timeout} = Err ->
Err
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ handle_cast(QueueEvent = {queue_event, _, _},
try rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
{ok, PState} ->
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
{error, consuming_queue_down = Reason, PState} ->
{stop, {shutdown, Reason}, pstate(State, PState)};
{error, Reason0, PState} ->
{stop, Reason0, pstate(State, PState)}
catch throw:{send_failed, Reason1} ->
Expand Down
57 changes: 48 additions & 9 deletions deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

-import(rabbit_ct_broker_helpers,
[rabbitmqctl_list/3,
rabbitmqctl/3,
rpc/4,
rpc/5,
rpc_all/4,
Expand Down Expand Up @@ -125,6 +126,9 @@ cluster_size_1_tests() ->
,retained_message_conversion
,bind_exchange_to_exchange
,bind_exchange_to_exchange_single_message
,notify_consumer_classic_queue_deleted
,notify_consumer_quorum_queue_deleted
,notify_consumer_qos0_queue_deleted
].

cluster_size_3_tests() ->
Expand Down Expand Up @@ -167,8 +171,8 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(mqtt, Config) ->
rabbit_ct_helpers:set_config(Config, {websocket, false});
init_per_group(mqtt, Config0) ->
rabbit_ct_helpers:set_config(Config0, {websocket, false});
init_per_group(Group, Config)
when Group =:= v3;
Group =:= v4;
Expand Down Expand Up @@ -208,7 +212,8 @@ init_per_testcase(T, Config)
init_per_testcase(T, Config)
when T =:= clean_session_disconnect_client;
T =:= clean_session_node_restart;
T =:= clean_session_node_kill ->
T =:= clean_session_node_kill;
T =:= notify_consumer_qos0_queue_deleted ->
ok = rpc(Config, rabbit_registry, register, [queue, <<"qos0">>, rabbit_mqtt_qos0_queue]),
init_per_testcase0(T, Config);
init_per_testcase(Testcase, Config) ->
Expand All @@ -225,7 +230,8 @@ end_per_testcase(T, Config)
end_per_testcase(T, Config)
when T =:= clean_session_disconnect_client;
T =:= clean_session_node_restart;
T =:= clean_session_node_kill ->
T =:= clean_session_node_kill;
T =:= notify_consumer_qos0_queue_deleted ->
ok = rpc(Config, rabbit_registry, unregister, [queue, <<"qos0">>]),
end_per_testcase0(T, Config);
end_per_testcase(Testcase, Config) ->
Expand Down Expand Up @@ -324,9 +330,7 @@ will_without_disconnect(Config) ->
%% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties.
%% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252
decode_basic_properties(Config) ->
App = rabbitmq_mqtt,
Par = durable_queue_type,
ok = rpc(Config, application, set_env, [App, Par, quorum]),
set_durable_queue_type(Config),
ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME),
C1 = connect(ClientId, Config, non_clean_sess_opts()),
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
Expand All @@ -340,7 +344,7 @@ decode_basic_properties(Config) ->
ok = emqtt:disconnect(C1),
C2 = connect(ClientId, Config, [{clean_start, true}]),
ok = emqtt:disconnect(C2),
ok = rpc(Config, application, unset_env, [App, Par]),
unset_durable_queue_type(Config),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).

quorum_queue_rejects(Config) ->
Expand Down Expand Up @@ -1955,6 +1959,35 @@ bind_exchange_to_exchange_single_message(Config) ->
ok = emqtt:disconnect(C),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).

notify_consumer_qos0_queue_deleted(Config) ->
Topic = atom_to_binary(?FUNCTION_NAME),
notify_consumer_queue_deleted(Config, Topic, <<"MQTT QoS 0">>, [{retry_interval, 1}], qos0).

notify_consumer_classic_queue_deleted(Config) ->
Topic = atom_to_binary(?FUNCTION_NAME),
notify_consumer_queue_deleted(Config, Topic, <<"classic">>, non_clean_sess_opts(), qos0).

notify_consumer_quorum_queue_deleted(Config) ->
set_durable_queue_type(Config),
Topic = atom_to_binary(?FUNCTION_NAME),
notify_consumer_queue_deleted(Config, Topic, <<"quorum">>, non_clean_sess_opts(), qos1),
unset_durable_queue_type(Config).

notify_consumer_queue_deleted(Config, Name = Topic, ExpectedType, ConnOpts, Qos) ->
C = connect(Name, Config, ConnOpts),
{ok, _, _} = emqtt:subscribe(C, Topic, Qos),
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1),
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1),
ok = expect_publishes(C, Topic, [<<"m1">>, <<"m2">>]),

[[QName, Type]] = rabbitmqctl_list(Config, 0, ["list_queues", "name", "type", "--no-table-headers"]),
?assertMatch(ExpectedType, Type),

process_flag(trap_exit, true),
{ok, _} = rabbitmqctl(Config, 0, ["delete_queue", QName]),

await_exit(C).

%% -------------------------------------------------------------------
%% Internal helpers
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1985,7 +2018,7 @@ await_confirms_unordered(From, Left) ->
end.

await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
QueueName = rabbit_mqtt_util:queue_name_bin(
rabbit_data_coercion:to_binary(ClientId), QoS),
eventually(
Expand Down Expand Up @@ -2030,3 +2063,9 @@ assert_v5_disconnect_reason_code(Config, ReasonCode) ->
after ?TIMEOUT -> ct:fail("missing DISCONNECT packet from server")
end
end.

set_durable_queue_type(Config) ->
ok = rpc(Config, application, set_env, [rabbitmq_mqtt, durable_queue_type, quorum]).

unset_durable_queue_type(Config) ->
ok = rpc(Config, application, unset_env, [rabbitmq_mqtt, durable_queue_type]).
3 changes: 3 additions & 0 deletions deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ duplicate_client_id(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
publish_to_all_queue_types_qos0(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
publish_to_all_queue_types_qos1(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
maintenance(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
notify_consumer_classic_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
notify_consumer_quorum_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
notify_consumer_qos0_queue_deleted(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config).
Loading