Skip to content

Commit c738643

Browse files
committed
mqtt_shared_SUITE
1 parent 2d1e7ea commit c738643

File tree

1 file changed

+45
-27
lines changed

1 file changed

+45
-27
lines changed

deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,14 @@ end_per_testcase(Testcase, Config) ->
222222
end_per_testcase0(Testcase, Config) ->
223223
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
224224
%% Assert that every testcase cleaned up their MQTT sessions.
225+
_ = rpc(Config, ?MODULE, delete_queues, []),
225226
eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))),
226227
rabbit_ct_helpers:testcase_finished(Config, Testcase).
227228

229+
delete_queues() ->
230+
[catch rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
231+
|| Q <- rabbit_amqqueue:list()].
232+
228233
%% -------------------------------------------------------------------
229234
%% Testsuite cases
230235
%% -------------------------------------------------------------------
@@ -315,15 +320,16 @@ decode_basic_properties(Config) ->
315320
{ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1),
316321
QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]),
317322
?assertEqual(1, length(QuorumQueues)),
318-
Ch = rabbit_ct_client_helpers:open_channel(Config),
323+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
319324
amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>,
320325
routing_key = Topic},
321326
#amqp_msg{payload = Payload}),
322327
ok = expect_publishes(C1, Topic, [Payload]),
323328
ok = emqtt:disconnect(C1),
324329
C2 = connect(ClientId, Config, [{clean_start, true}]),
325330
ok = emqtt:disconnect(C2),
326-
ok = rpc(Config, application, unset_env, [App, Par]).
331+
ok = rpc(Config, application, unset_env, [App, Par]),
332+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
327333

328334
quorum_queue_rejects(Config) ->
329335
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
@@ -376,7 +382,7 @@ publish_to_all_queue_types_qos1(Config) ->
376382
publish_to_all_queue_types(Config, qos1).
377383

378384
publish_to_all_queue_types(Config, QoS) ->
379-
Ch = rabbit_ct_client_helpers:open_channel(Config),
385+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
380386

381387
CQ = <<"classic-queue">>,
382388
QQ = <<"quorum-queue">>,
@@ -428,7 +434,8 @@ publish_to_all_queue_types(Config, QoS) ->
428434
delete_queue(Ch, [CQ, QQ, SQ]),
429435
ok = emqtt:disconnect(C),
430436
?awaitMatch([],
431-
all_connection_pids(Config), 10_000, 1000).
437+
all_connection_pids(Config), 10_000, 1000),
438+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
432439

433440
publish_to_all_non_deprecated_queue_types_qos0(Config) ->
434441
publish_to_all_non_deprecated_queue_types(Config, qos0).
@@ -437,7 +444,7 @@ publish_to_all_non_deprecated_queue_types_qos1(Config) ->
437444
publish_to_all_non_deprecated_queue_types(Config, qos1).
438445

439446
publish_to_all_non_deprecated_queue_types(Config, QoS) ->
440-
Ch = rabbit_ct_client_helpers:open_channel(Config),
447+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
441448

442449
CQ = <<"classic-queue">>,
443450
QQ = <<"quorum-queue">>,
@@ -487,7 +494,8 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) ->
487494
delete_queue(Ch, [CQ, QQ, SQ]),
488495
ok = emqtt:disconnect(C),
489496
?awaitMatch([],
490-
all_connection_pids(Config), 10_000, 1000).
497+
all_connection_pids(Config), 10_000, 1000),
498+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
491499

492500
%% This test case does not require multiple nodes
493501
%% but it is grouped together with flow test cases for other queue types
@@ -519,7 +527,7 @@ flow(Config, {App, Par, Val}, QueueType)
519527
Result = rpc_all(Config, application, set_env, [App, Par, Val]),
520528
?assert(lists:all(fun(R) -> R =:= ok end, Result)),
521529

522-
Ch = rabbit_ct_client_helpers:open_channel(Config),
530+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
523531
QueueName = Topic = atom_to_binary(?FUNCTION_NAME),
524532
declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]),
525533
bind(Ch, QueueName, Topic),
@@ -547,7 +555,8 @@ flow(Config, {App, Par, Val}, QueueType)
547555
?awaitMatch([],
548556
all_connection_pids(Config), 10_000, 1000),
549557
?assertEqual(Result,
550-
rpc_all(Config, application, set_env, [App, Par, DefaultVal])).
558+
rpc_all(Config, application, set_env, [App, Par, DefaultVal])),
559+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
551560

552561
events(Config) ->
553562
ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder),
@@ -791,9 +800,10 @@ queue_down_qos1(Config) ->
791800
ok = rabbit_ct_broker_helpers:start_node(Config, 1)
792801
end,
793802

794-
Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0),
803+
{Conn, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
795804
delete_queue(Ch0, CQ),
796-
ok = emqtt:disconnect(C).
805+
ok = emqtt:disconnect(C),
806+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch0).
797807

798808
%% Consuming classic queue on a different node goes down.
799809
consuming_classic_queue_down(Config) ->
@@ -832,7 +842,7 @@ consuming_classic_queue_down(Config) ->
832842
ok.
833843

834844
delete_create_queue(Config) ->
835-
Ch = rabbit_ct_client_helpers:open_channel(Config),
845+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
836846
CQ1 = <<"classic-queue-1-delete-create">>,
837847
CQ2 = <<"classic-queue-2-delete-create">>,
838848
QQ = <<"quorum-queue-delete-create">>,
@@ -892,7 +902,8 @@ delete_create_queue(Config) ->
892902
1000, 10),
893903

894904
delete_queue(Ch, [CQ1, CQ2, QQ]),
895-
ok = emqtt:disconnect(C).
905+
ok = emqtt:disconnect(C),
906+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
896907

897908
session_expiry(Config) ->
898909
App = rabbitmq_mqtt,
@@ -1088,28 +1099,30 @@ large_message_amqp_to_mqtt(Config) ->
10881099
C = connect(ClientId, Config),
10891100
{ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}),
10901101

1091-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1102+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
10921103
Payload0 = binary:copy(<<"x">>, 8_000_000),
10931104
Payload = <<Payload0/binary, "y">>,
10941105
amqp_channel:call(Ch,
10951106
#'basic.publish'{exchange = <<"amq.topic">>,
10961107
routing_key = Topic},
10971108
#amqp_msg{payload = Payload}),
10981109
ok = expect_publishes(C, Topic, [Payload]),
1099-
ok = emqtt:disconnect(C).
1110+
ok = emqtt:disconnect(C),
1111+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
11001112

11011113
amqp_to_mqtt_qos0(Config) ->
11021114
Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME),
11031115
C = connect(ClientId, Config),
11041116
{ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}),
11051117

1106-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1118+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
11071119
amqp_channel:call(Ch,
11081120
#'basic.publish'{exchange = <<"amq.topic">>,
11091121
routing_key = Topic},
11101122
#amqp_msg{payload = Payload}),
11111123
ok = expect_publishes(C, Topic, [Payload]),
1112-
ok = emqtt:disconnect(C).
1124+
ok = emqtt:disconnect(C),
1125+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
11131126

11141127
%% Packet identifier is a non zero two byte integer.
11151128
%% Test that the server wraps around the packet identifier.
@@ -1590,7 +1603,7 @@ rabbit_status_connection_count(Config) ->
15901603
trace(Config) ->
15911604
Server = atom_to_binary(get_node_config(Config, 0, nodename)),
15921605
Topic = Payload = TraceQ = atom_to_binary(?FUNCTION_NAME),
1593-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1606+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
15941607
declare_queue(Ch, TraceQ, []),
15951608
#'queue.bind_ok'{} = amqp_channel:call(
15961609
Ch, #'queue.bind'{queue = TraceQ,
@@ -1645,11 +1658,12 @@ trace(Config) ->
16451658
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
16461659

16471660
delete_queue(Ch, TraceQ),
1648-
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]].
1661+
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]],
1662+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
16491663

16501664
trace_large_message(Config) ->
16511665
TraceQ = <<"trace-queue">>,
1652-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1666+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
16531667
declare_queue(Ch, TraceQ, []),
16541668
#'queue.bind_ok'{} = amqp_channel:call(
16551669
Ch, #'queue.bind'{queue = TraceQ,
@@ -1674,7 +1688,8 @@ trace_large_message(Config) ->
16741688

16751689
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]),
16761690
delete_queue(Ch, TraceQ),
1677-
ok = emqtt:disconnect(C).
1691+
ok = emqtt:disconnect(C),
1692+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
16781693

16791694
max_packet_size_unauthenticated(Config) ->
16801695
ClientId = ?FUNCTION_NAME,
@@ -1765,7 +1780,7 @@ default_queue_type(Config) ->
17651780
incoming_message_interceptors(Config) ->
17661781
Key = ?FUNCTION_NAME,
17671782
ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]),
1768-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1783+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
17691784
Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME),
17701785
CQName = <<"my classic queue">>,
17711786
Stream = <<"my stream">>,
@@ -1813,7 +1828,8 @@ incoming_message_interceptors(Config) ->
18131828
delete_queue(Ch, Stream),
18141829
delete_queue(Ch, CQName),
18151830
true = rpc(Config, persistent_term, erase, [Key]),
1816-
ok = emqtt:disconnect(C).
1831+
ok = emqtt:disconnect(C),
1832+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
18171833

18181834
%% This test makes sure that a retained message that got written in 3.12 or earlier
18191835
%% can be consumed in 3.13 or later.
@@ -1853,7 +1869,7 @@ bind_exchange_to_exchange(Config) ->
18531869
SourceX = <<"amq.topic">>,
18541870
DestinationX = <<"destination">>,
18551871
Q = <<"q">>,
1856-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1872+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
18571873
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX,
18581874
durable = true,
18591875
auto_delete = true}),
@@ -1871,13 +1887,14 @@ bind_exchange_to_exchange(Config) ->
18711887
eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}},
18721888
amqp_channel:call(Ch, #'basic.get'{queue = Q}))),
18731889
#'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}),
1874-
ok = emqtt:disconnect(C).
1890+
ok = emqtt:disconnect(C),
1891+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
18751892

18761893
bind_exchange_to_exchange_single_message(Config) ->
18771894
SourceX = <<"amq.topic">>,
18781895
DestinationX = <<"destination">>,
18791896
Q = <<"q">>,
1880-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1897+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
18811898
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX,
18821899
durable = true,
18831900
auto_delete = true}),
@@ -1904,7 +1921,8 @@ bind_exchange_to_exchange_single_message(Config) ->
19041921
timer:sleep(10),
19051922
?assertEqual(#'queue.delete_ok'{message_count = 0},
19061923
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
1907-
ok = emqtt:disconnect(C).
1924+
ok = emqtt:disconnect(C),
1925+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch).
19081926

19091927
%% -------------------------------------------------------------------
19101928
%% Internal helpers
@@ -1936,7 +1954,7 @@ await_confirms_unordered(From, Left) ->
19361954
end.
19371955

19381956
await_consumer_count(ConsumerCount, ClientId, QoS, Config) ->
1939-
Ch = rabbit_ct_client_helpers:open_channel(Config),
1957+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
19401958
QueueName = rabbit_mqtt_util:queue_name_bin(
19411959
rabbit_data_coercion:to_binary(ClientId), QoS),
19421960
eventually(

0 commit comments

Comments
 (0)