Skip to content

Commit a6393ba

Browse files
Merge pull request #8254 from rabbitmq/mqtt-decode-props
Fix Native MQTT crash if properties encoded
2 parents 712c2b9 + f485e51 commit a6393ba

File tree

2 files changed

+25
-1
lines changed

2 files changed

+25
-1
lines changed

deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1413,8 +1413,10 @@ deliver_to_client(Msgs, Ack, State) ->
14131413
end, State, Msgs).
14141414

14151415
deliver_one_to_client(Msg = {QNameOrType, QPid, QMsgId, _Redelivered,
1416-
#basic_message{content = #content{properties = #'P_basic'{headers = Headers}}}},
1416+
#basic_message{content = Content}},
14171417
AckRequired, State0) ->
1418+
#content{properties = #'P_basic'{headers = Headers}} =
1419+
rabbit_binary_parser:ensure_content_decoded(Content),
14181420
PublisherQoS = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
14191421
{byte, QoS0} ->
14201422
QoS0;

deps/rabbitmq_mqtt/test/shared_SUITE.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ subgroups() ->
7373
,pubsub_separate_connections
7474
,will_with_disconnect
7575
,will_without_disconnect
76+
,decode_basic_properties
7677
,quorum_queue_rejects
7778
,events
7879
,internal_event_handler
@@ -278,6 +279,27 @@ will_without_disconnect(Config) ->
278279

279280
ok = emqtt:disconnect(Sub).
280281

282+
%% Test that an MQTT connection decodes the AMQP 0.9.1 'P_basic' properties.
283+
%% see https://github.com/rabbitmq/rabbitmq-server/discussions/8252
284+
decode_basic_properties(Config) ->
285+
App = rabbitmq_mqtt,
286+
Par = durable_queue_type,
287+
ok = rpc(Config, application, set_env, [App, Par, quorum]),
288+
ClientId = Topic = Payload = atom_to_binary(?FUNCTION_NAME),
289+
C1 = connect(ClientId, Config, [{clean_start, false}]),
290+
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
291+
QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]),
292+
?assertEqual(1, length(QuorumQueues)),
293+
Ch = rabbit_ct_client_helpers:open_channel(Config),
294+
amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>,
295+
routing_key = Topic},
296+
#amqp_msg{payload = Payload}),
297+
ok = expect_publishes(C1, Topic, [Payload]),
298+
ok = emqtt:disconnect(C1),
299+
C2 = connect(ClientId, Config, [{clean_start, true}]),
300+
ok = emqtt:disconnect(C2),
301+
ok = rpc(Config, application, unset_env, [App, Par]).
302+
281303
quorum_queue_rejects(Config) ->
282304
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
283305
Name = atom_to_binary(?FUNCTION_NAME),

0 commit comments

Comments
 (0)