Skip to content

Commit 9891c3c

Browse files
committed
wip: tests for client_id
1 parent c7c7cd1 commit 9891c3c

File tree

2 files changed

+119
-1
lines changed

2 files changed

+119
-1
lines changed

deps/rabbitmq_mqtt/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export BUILD_WITHOUT_QUIC
4343

4444
LOCAL_DEPS = ssl
4545
DEPS = ranch rabbit amqp10_common
46-
TEST_DEPS = cowlib emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream rabbitmq_federation
46+
TEST_DEPS = cowlib emqtt ct_helper rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_management amqp_client rabbitmq_consistent_hash_exchange rabbitmq_amqp_client rabbitmq_stomp rabbitmq_stream rabbitmq_federation amqp10_client
4747

4848
PLT_APPS += rabbitmq_cli elixir
4949

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
-module(client_id_interceptor_SUITE).
2+
-compile([export_all,
3+
nowarn_export_all]).
4+
5+
-include_lib("common_test/include/ct.hrl").
6+
-include_lib("eunit/include/eunit.hrl").
7+
-include_lib("amqp_client/include/amqp_client.hrl").
8+
9+
-import(util,
10+
[connect/3]).
11+
12+
all() ->
13+
[{group, intercept}].
14+
15+
groups() ->
16+
[
17+
{intercept, [], [incoming]}
18+
].
19+
20+
init_per_suite(Config) ->
21+
rabbit_ct_helpers:log_environment(),
22+
rabbit_ct_helpers:run_setup_steps(Config).
23+
24+
end_per_suite(Config) ->
25+
rabbit_ct_helpers:run_teardown_steps(Config).
26+
27+
init_per_testcase(Testcase, Config0) ->
28+
Config1 = rabbit_ct_helpers:set_config(
29+
Config0, [{rmq_nodename_suffix, Testcase}]),
30+
Val = maps:to_list(
31+
maps:from_keys([rabbit_mqtt_message_interceptor_client_id],
32+
#{annotation_key => <<"x-client_id">>})),
33+
Config2 = rabbit_ct_helpers:merge_app_env(
34+
Config1, {rabbit, [{incoming_message_interceptors, Val}]}),
35+
Config3 = rabbit_ct_helpers:run_steps(
36+
Config2,
37+
rabbit_ct_broker_helpers:setup_steps() ++
38+
rabbit_ct_client_helpers:setup_steps() ++
39+
[fun start_amqp10_client_app/1]),
40+
rabbit_ct_helpers:testcase_started(Config3, Testcase).
41+
42+
end_per_testcase(Testcase, Config0) ->
43+
Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase),
44+
rabbit_ct_helpers:run_teardown_steps(
45+
Config,
46+
rabbit_ct_client_helpers:teardown_steps() ++
47+
rabbit_ct_broker_helpers:teardown_steps()).
48+
49+
start_amqp10_client_app(Config) ->
50+
?assertMatch({ok, _}, application:ensure_all_started(amqp10_client)),
51+
Config.
52+
53+
incoming(Config) ->
54+
Ch = rabbit_ct_client_helpers:open_channel(Config),
55+
56+
QQ = <<"qq1">>,
57+
Topic = <<"mytopic">>,
58+
59+
declare_queue(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
60+
bind(Ch, QQ, Topic),
61+
62+
ClientId = ?FUNCTION_NAME,
63+
C = connect(ClientId, Config, [{max_inflight, 200},
64+
{retry_interval, 2}]),
65+
66+
?_assertMatch({ok, _}, emqtt:publish(C, Topic, <<"1">>, [{qos, 1}])),
67+
?_assertMatch({#'basic.get_ok'{},
68+
#amqp_msg{props = #'P_basic'{headers = [{<<"x-client_id">>,
69+
longstr,
70+
<<"incoming">>}]} }},
71+
amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})),
72+
OpnConf = connection_config(Config),
73+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
74+
{ok, Session} = amqp10_client:begin_session(Connection),
75+
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, <<"qq1">>),
76+
77+
receive M ->
78+
ct:log("Received message: ~p", [M]),
79+
ok
80+
after 5000 ->
81+
ct:log("Timeout waiting for message")
82+
end,
83+
84+
%% grant some credit to the remote sender but don't auto-renew it
85+
ok = amqp10_client:flow_link_credit(Receiver, 5, never),
86+
87+
%% wait for a delivery
88+
receive
89+
{amqp10_msg, Receiver, InMsg} ->
90+
ct:log("Received message: ~p", [InMsg]),
91+
ok
92+
after 2000 ->
93+
exit(delivery_timeout)
94+
end.
95+
96+
97+
declare_queue(Ch, QueueName, Args)
98+
when is_pid(Ch), is_binary(QueueName), is_list(Args) ->
99+
#'queue.declare_ok'{} = amqp_channel:call(
100+
Ch, #'queue.declare'{
101+
queue = QueueName,
102+
durable = true,
103+
arguments = Args}).
104+
105+
bind(Ch, QueueName, Topic)
106+
when is_pid(Ch), is_binary(QueueName), is_binary(Topic) ->
107+
#'queue.bind_ok'{} = amqp_channel:call(
108+
Ch, #'queue.bind'{queue = QueueName,
109+
exchange = <<"amq.topic">>,
110+
routing_key = Topic}).
111+
112+
connection_config(Config) ->
113+
Host = ?config(rmq_hostname, Config),
114+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
115+
#{address => Host,
116+
port => Port,
117+
container_id => <<"my container">>,
118+
sasl => {plain, <<"guest">>, <<"guest">>}}.

0 commit comments

Comments
 (0)