Skip to content

Commit 522760f

Browse files
committed
Store MQTT messages as non-durable if QoS 0
By default, when the 'durable' message container (mc) annotation is unset, messages are interpreted to be durable. Prior to this commit, MQTT messages that were sent with QoS 0 were stored durably in classic queues. This commit takes the same approach for mc_mqtt as for mc_amqpl and mc_amqp: If the message is durable, the durable mc annotation will not be set. If the message is non-durable, the durable mc annotation will be set to false. (cherry picked from commit e96125b)
1 parent 7e4f36f commit 522760f

File tree

3 files changed

+29
-5
lines changed

3 files changed

+29
-5
lines changed

deps/rabbitmq_mqtt/src/mc_mqtt.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ init(Msg = #mqtt_msg{qos = Qos,
2727
when is_integer(Qos) ->
2828
Anns0 = case Qos > 0 of
2929
true ->
30-
#{?ANN_DURABLE => true};
30+
#{};
3131
false ->
32-
#{}
32+
#{?ANN_DURABLE => false}
3333
end,
3434
Anns1 = case Props of
3535
#{'Message-Expiry-Interval' := Seconds} ->

deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ groups() ->
3737
mqtt_amqpl_alt,
3838
mqtt_amqp,
3939
mqtt_amqp_alt,
40-
amqp_mqtt
40+
amqp_mqtt,
41+
is_persistent
4142
]}
4243
].
4344

@@ -501,6 +502,19 @@ amqp_mqtt(_Config) ->
501502
}, Mqtt),
502503
ok.
503504

505+
is_persistent(_Config) ->
506+
Msg0 = #mqtt_msg{qos = 0,
507+
topic = <<"my/topic">>,
508+
payload = <<>>},
509+
Mc0 = mc:init(mc_mqtt, Msg0, #{}),
510+
?assertNot(mc:is_persistent(Mc0)),
511+
512+
Msg1 = #mqtt_msg{qos = 1,
513+
topic = <<"my/topic">>,
514+
payload = <<>>},
515+
Mc1 = mc:init(mc_mqtt, Msg1, #{}),
516+
?assert(mc:is_persistent(Mc1)).
517+
504518
mqtt_msg() ->
505519
#mqtt_msg{qos = 0,
506520
topic = <<"my/topic">>,

deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
-import(rabbit_ct_broker_helpers,
2323
[rpc/4]).
2424
-import(rabbit_ct_helpers,
25-
[eventually/3]).
25+
[eventually/1,
26+
eventually/3]).
2627

2728
all() ->
2829
[{group, tests}].
@@ -87,7 +88,8 @@ end_per_testcase(Testcase, Config) ->
8788
amqpl(Config) ->
8889
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
8990
Ch = rabbit_ct_client_helpers:open_channel(Config),
90-
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
91+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q,
92+
durable = true}),
9193
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
9294
exchange = <<"amq.topic">>,
9395
routing_key = <<"my.topic">>}),
@@ -146,6 +148,14 @@ amqpl(Config) ->
146148
after 1000 -> ct:fail("did not receive reply")
147149
end,
148150

151+
%% Another message MQTT 5.0 to AMQP 0.9.1, this time with QoS 0
152+
ok = emqtt:publish(C, <<"my/topic">>, RequestPayload, [{qos, 0}]),
153+
eventually(
154+
?_assertMatch(
155+
{#'basic.get_ok'{}, #amqp_msg{payload = RequestPayload,
156+
props = #'P_basic'{delivery_mode = 1}}},
157+
amqp_channel:call(Ch, #'basic.get'{queue = Q}))),
158+
149159
ok = emqtt:disconnect(C).
150160

151161
amqp(Config) ->

0 commit comments

Comments
 (0)