diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index b6cae214c8c3..27f4e9cf81e1 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -426,7 +426,7 @@ protocol_state(Msg = #mqtt_msg{props = Props0, undefined -> Props2; Ttl -> - case maps:get(?ANN_TIMESTAMP, Anns) of + case maps:get(?ANN_TIMESTAMP, Anns, undefined) of undefined -> Props2; Timestamp -> diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index 14d88f357602..c6fd2018f47f 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -33,7 +33,8 @@ groups() -> mqtt_amqp, mqtt_amqp_alt, amqp_mqtt, - is_persistent + is_persistent, + amqpl_to_mqtt_gh_12707 ]} ]. @@ -156,6 +157,34 @@ roundtrip_amqpl(_Config) -> ExpectedUserProperty = lists:keysort(1, UserProperty), ?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props). +amqpl_to_mqtt_gh_12707(_Config) -> + Props = #'P_basic'{content_type = <<"text/plain">>, + content_encoding = <<"gzip">>, + headers = [], + delivery_mode = 1, + priority = 7, + correlation_id = <<"gh_12707-corr">> , + reply_to = <<"reply-to">>, + expiration = <<"12707">>, + message_id = <<"msg-id">>, + timestamp = 99, + type = <<"45">>, + user_id = <<"gh_12707">>, + app_id = <<"rmq">> + }, + Payload = [<<"gh_12707">>], + Content = #content{properties = Props, + payload_fragments_rev = Payload}, + Content = #content{properties = Props, + payload_fragments_rev = Payload}, + Anns = #{ + ?ANN_EXCHANGE => <<"amq.topic">>, + ?ANN_ROUTING_KEYS => [<<"dummy">>] + }, + OriginalMsg = mc:init(mc_amqpl, Content, Anns), + Converted = mc:convert(mc_mqtt, OriginalMsg), + ?assertEqual(12707, mc:get_annotation(ttl, Converted)). + %% Non-UTF-8 Correlation Data should also be converted (via AMQP 0.9.1 header x-correlation-id). roundtrip_amqpl_correlation(_Config) -> Msg0 = mqtt_msg(), diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 249e335e2afd..28bdfa0d0513 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -35,6 +35,7 @@ groups() -> [{cluster_size_1, [shuffle], [ mqtt_amqpl_mqtt, + amqpl_mqtt_gh_12707, mqtt_amqp_mqtt, amqp_mqtt_amqp, mqtt_stomp_mqtt, @@ -169,6 +170,39 @@ mqtt_amqpl_mqtt(Config) -> ok = emqtt:disconnect(C). +amqpl_mqtt_gh_12707(Config) -> + Q = ClientId = atom_to_binary(?FUNCTION_NAME), + Ch = rabbit_ct_client_helpers:open_channel(Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q, + exchange = <<"amq.topic">>, + routing_key = <<"gh12707">>}), + C = connect(ClientId, Config), + + Topic = <<"gh12707">>, + Payload = <<"gh_12707">>, + + {ok, _, [1]} = emqtt:subscribe(C, #{'Subscription-Identifier' => 676}, [{Topic, [{qos, 1}]}]), + amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>, + routing_key = Topic}, + #amqp_msg{payload = Payload, + props = #'P_basic'{content_type = <<"application/json">>, + expiration = <<"12707">>, + headers = []}}), + + receive {publish, + #{topic := MqttTopic, + payload := MqttPayload}} -> + ?assertEqual(MqttTopic, Topic), + ?assertEqual(Payload, MqttPayload) + after 1000 -> + ct:fail("did not receive a delivery") + end, + + amqp_channel:call(Ch, #'queue.delete'{queue = Q}), + rabbit_ct_client_helpers:close_channel(Ch), + ok = emqtt:disconnect(C). + mqtt_amqp_mqtt(Config) -> Host = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),