Skip to content

Commit f485e51

Browse files
committed
Fix Native MQTT crash if properties encoded
Fixes #8252 The MQTT connection must decode AMQP 0.9.1 properties as they are getting encoded for example in: https://github.com/rabbitmq/rabbitmq-server/blob/712c2b9ec96dca4f07f14dfed9c6028ef0e748fd/deps/rabbit/src/rabbit_variable_queue.erl#L2219 and https://github.com/rabbitmq/rabbitmq-server/blob/712c2b9ec96dca4f07f14dfed9c6028ef0e748fd/deps/rabbit/src/rabbit_quorum_queue.erl#L1680 Prior to this commit, the MQTT connection process could crash with: ``` [{rabbit_mqtt_processor,deliver_one_to_client, [{{resource,<<"/">>,queue, <<"mqtt-subscription-mqtt-explorer-c5351d21qos0">>}, <0.4546.0>,undefined,false, {basic_message, {resource,<<"/">>,exchange,<<"amq.topic">>}, [<<"plant.v1.M3.BCD423.rev.fillStateChangedEvent">>], {content,60,none, <<80,64,16,97,112,112,108,105,99,97,116,105,111,110,47, 106,115,111,110,2,0,0,0,0,100,103,141,186>>, rabbit_framing_amqp_0_9_1, [<<"{\"plantIdentificationCode\":\"M3/BCD423/rev\",\"isFullSensorTriggered\":false,\"numberOfCarriers\":20,\"maxNumberOfCarriers\":1174,\"numberOfEmpties\":0,\"numberOfCarriersWithPayload\":20,\"numberOfCarriersWithOrder\":0,\"trackLength\":30000,\"trackLengthOccupied\":1130}">>]}, <<31,230,178,158,209,240,53,221,100,60,64,5,227,237,58,21>>, true}}, false, {state, {cfg,#Port<0.282>,mqtt311,true,undefined, {resource,<<"/">>,exchange,<<"amq.topic">>}, undefined,false,none,<0.680.0>,flow,none,10,<<"/">>, <<"mqtt-explorer-c5351d21">>,undefined, {192,168,10,131}, 1883, {192,168,10,130}, 53244,1684508087392,#Fun<rabbit_mqtt_reader.0.106886>}, {rabbit_queue_type, #{{resource,<<"/">>,queue, <<"mqtt-subscription-mqtt-explorer-c5351d21qos0">>} => {ctx,rabbit_classic_queue, {rabbit_classic_queue,<0.4546.0>,#{}, #{<0.4546.0> => ok}}}}}, #{},#{},1, #{<<"#">> => 0,<<"$SYS/#">> => 0}, {auth_state, {user,<<"rabbit">>,[], [{rabbit_auth_backend_internal, #Fun<rabbit_auth_backend_internal.3.114557357>}]}, #{<<"client_id">> => <<"mqtt-explorer-c5351d21">>}}, registered,#{},0}], [{file,"rabbit_mqtt_processor.erl"},{line,1414}]}, {lists,foldl,3,[{file,"lists.erl"},{line,1350}]}, {rabbit_mqtt_processor,handle_queue_event,2, [{file,"rabbit_mqtt_processor.erl"},{line,1345}]}, {rabbit_mqtt_reader,handle_cast,2, [{file,"rabbit_mqtt_reader.erl"},{line,134}]}, {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,1123}]}, {gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,1200}]}, {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]} ```
1 parent 712c2b9 commit f485e51

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1413,8 +1413,10 @@ deliver_to_client(Msgs, Ack, State) ->
14131413
end, State, Msgs).
14141414

14151415
deliver_one_to_client(Msg = {QNameOrType, QPid, QMsgId, _Redelivered,
1416-
#basic_message{content = #content{properties = #'P_basic'{headers = Headers}}}},
1416+
#basic_message{content = Content}},
14171417
AckRequired, State0) ->
1418+
#content{properties = #'P_basic'{headers = Headers}} =
1419+
rabbit_binary_parser:ensure_content_decoded(Content),
14181420
PublisherQoS = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
14191421
{byte, QoS0} ->
14201422
QoS0;

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ subgroups() ->
7373
,pubsub_separate_connections
7474
,will_with_disconnect
7575
,will_without_disconnect
76+
,decode_basic_properties
7677
,quorum_queue_rejects
7778
,events
7879
,internal_event_handler
@@ -278,6 +279,27 @@ will_without_disconnect(Config) ->
278279

279280
ok = emqtt:disconnect(Sub).
280281

282+
%% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties.
283+
%% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252
284+
decode_basic_properties(Config) ->
285+
App = rabbitmq_mqtt,
286+
Par = durable_queue_type,
287+
ok = rpc(Config, application, set_env, [App, Par, quorum]),
288+
ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME),
289+
C1 = connect(ClientId, Config, [{clean_start, false}]),
290+
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
291+
QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]),
292+
?assertEqual(1, length(QuorumQueues)),
293+
Ch = rabbit_ct_client_helpers:open_channel(Config),
294+
amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>,
295+
routing_key = Topic},
296+
#amqp_msg{payload = Payload}),
297+
ok = expect_publishes(C1, Topic, [Payload]),
298+
ok = emqtt:disconnect(C1),
299+
C2 = connect(ClientId, Config, [{clean_start, true}]),
300+
ok = emqtt:disconnect(C2),
301+
ok = rpc(Config, application, unset_env, [App, Par]).
302+
281303
quorum_queue_rejects(Config) ->
282304
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
283305
Name = atom_to_binary(?FUNCTION_NAME),

0 commit comments

Comments
 (0)