Skip to content

Commit 1b75bad

Browse files
committed
Add AMQP 1.0 -> MQTT 5.0 -> AMQP 1.0 test
1 parent 390d571 commit 1b75bad

File tree

1 file changed

+70
-8
lines changed

1 file changed

+70
-8
lines changed

deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ all() ->
3131
groups() ->
3232
[{tests, [shuffle],
3333
[
34-
amqpl,
35-
amqp,
36-
stomp,
37-
stream
34+
mqtt_amqpl_mqtt,
35+
mqtt_amqp_mqtt,
36+
amqp_mqtt_amqp,
37+
mqtt_stomp_mqtt,
38+
mqtt_stream
3839
]
3940
}].
4041

@@ -84,7 +85,7 @@ end_per_testcase(Testcase, Config) ->
8485
%% Testsuite cases
8586
%% -------------------------------------------------------------------
8687

87-
amqpl(Config) ->
88+
mqtt_amqpl_mqtt(Config) ->
8889
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
8990
Ch = rabbit_ct_client_helpers:open_channel(Config),
9091
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
@@ -148,7 +149,7 @@ amqpl(Config) ->
148149

149150
ok = emqtt:disconnect(C).
150151

151-
amqp(Config) ->
152+
mqtt_amqp_mqtt(Config) ->
152153
Host = ?config(rmq_hostname, Config),
153154
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
154155
ClientId = Container = atom_to_binary(?FUNCTION_NAME),
@@ -268,7 +269,68 @@ amqp(Config) ->
268269
end,
269270
ok = emqtt:disconnect(C).
270271

271-
stomp(Config) ->
272+
amqp_mqtt_amqp(Config) ->
273+
Correlation = QName = ClientId = Container = atom_to_binary(?FUNCTION_NAME),
274+
275+
C = connect(ClientId, Config),
276+
{ok, _, [1]} = emqtt:subscribe(C, <<"t/1">>, qos1),
277+
278+
Host = ?config(rmq_hostname, Config),
279+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
280+
OpnConf = #{address => Host,
281+
port => Port,
282+
container_id => Container,
283+
sasl => {plain, <<"guest">>, <<"guest">>}},
284+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
285+
{ok, Session} = amqp10_client:begin_session(Connection),
286+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"pair">>),
287+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}),
288+
ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"t.2">>, #{}),
289+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
290+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, <<"/queue/", QName/binary>>),
291+
292+
%% AMQP 1.0 to MQTT 5.0
293+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, <<"/exchange/amq.topic/key/t.1">>),
294+
receive {amqp10_event, {link, Sender, credited}} -> ok
295+
after 2000 -> ct:fail(credited_timeout)
296+
end,
297+
RequestBody = <<"my request">>,
298+
299+
Msg1 = amqp10_msg:set_headers(
300+
#{durable => true},
301+
amqp10_msg:set_properties(
302+
#{correlation_id => Correlation,
303+
reply_to => <<"/exchange/amq.topic/key/t.2">>},
304+
amqp10_msg:new(<<>>, RequestBody, true))),
305+
ok = amqp10_client:send_msg(Sender, Msg1),
306+
307+
RespTopic = receive {publish, MqttMsg} ->
308+
ct:pal("Received MQTT message:~n~p", [MqttMsg]),
309+
#{client_pid := C,
310+
qos := 1,
311+
topic := <<"t/1">>,
312+
payload := RequestBody,
313+
properties := #{'Correlation-Data' := Correlation,
314+
'Response-Topic' := ResponseTopic}} = MqttMsg,
315+
ResponseTopic
316+
after 2000 -> ct:fail("did not receive request")
317+
end,
318+
319+
%% MQTT 5.0 to AMQP 1.0
320+
RespBody = <<"my response">>,
321+
{ok, _} = emqtt:publish(C, RespTopic,
322+
#{'Correlation-Data' => Correlation},
323+
RespBody, [{qos, 1}]),
324+
325+
{ok, Msg2} = amqp10_client:get_msg(Receiver),
326+
ct:pal("Received AMQP 1.0 message:~n~p", [Msg2]),
327+
?assertEqual(RespBody, amqp10_msg:body_bin(Msg2)),
328+
329+
ok = emqtt:disconnect(C),
330+
ok = amqp10_client:end_session(Session),
331+
ok = amqp10_client:close_connection(Connection).
332+
333+
mqtt_stomp_mqtt(Config) ->
272334
{ok, StompC0} = stomp_connect(Config),
273335
ok = stomp_send(StompC0, "SUBSCRIBE", [{"destination", "/topic/t.1"},
274336
{"receipt", "my-receipt"},
@@ -353,7 +415,7 @@ stomp(Config) ->
353415

354416
%% The stream test case is one-way because an MQTT client can publish to a stream,
355417
%% but not consume (directly) from a stream.
356-
stream(Config) ->
418+
mqtt_stream(Config) ->
357419
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
358420
Ch = rabbit_ct_client_helpers:open_channel(Config),
359421

0 commit comments

Comments
 (0)