77-include_lib (" amqp_client/include/amqp_client.hrl" ).
88
99-import (util ,
10- [connect /3 ]).
10+ [connect /2 ]).
1111
1212all () ->
1313 [{group , intercept }].
@@ -22,21 +22,20 @@ init_per_suite(Config) ->
2222 rabbit_ct_helpers :run_setup_steps (Config ).
2323
2424end_per_suite (Config ) ->
25- rabbit_ct_helpers :run_teardown_steps (Config ).
26-
25+ Config .
2726init_per_testcase (Testcase , Config0 ) ->
2827 Config1 = rabbit_ct_helpers :set_config (
2928 Config0 , [{rmq_nodename_suffix , Testcase }]),
3029 Val = maps :to_list (
3130 maps :from_keys ([rabbit_mqtt_message_interceptor_client_id ],
3231 #{annotation_key => <<" x-client_id" >>})),
3332 Config2 = rabbit_ct_helpers :merge_app_env (
34- Config1 , {rabbit , [{incoming_message_interceptors , Val }]}),
33+ Config1 , {rabbit , [{incoming_message_interceptors , Val }]}),
3534 Config3 = rabbit_ct_helpers :run_steps (
36- Config2 ,
37- rabbit_ct_broker_helpers :setup_steps () ++
38- rabbit_ct_client_helpers :setup_steps () ++
39- [fun start_amqp10_client_app /1 ]),
35+ Config2 ,
36+ rabbit_ct_broker_helpers :setup_steps () ++
37+ rabbit_ct_client_helpers :setup_steps () ++
38+ [fun start_amqp10_client_app /1 ]),
4039 rabbit_ct_helpers :testcase_started (Config3 , Testcase ).
4140
4241end_per_testcase (Testcase , Config0 ) ->
@@ -51,68 +50,48 @@ start_amqp10_client_app(Config) ->
5150 Config .
5251
5352incoming (Config ) ->
53+ Host = ? config (rmq_hostname , Config ),
54+ Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
55+ ClientId = Container = atom_to_binary (? FUNCTION_NAME ),
56+
57+ % % With AMQP 1.0
58+ OpnConf = #{address => Host ,
59+ port => Port ,
60+ container_id => Container ,
61+ sasl => {plain , <<" guest" >>, <<" guest" >>}},
62+ {ok , Connection1 } = amqp10_client :open_connection (OpnConf ),
63+ {ok , Session1 } = amqp10_client :begin_session (Connection1 ),
64+ {ok , LinkPair } = rabbitmq_amqp_client :attach_management_link_pair_sync (Session1 , <<" pair" >>),
65+ QName = <<" queue for AMQP 1.0 client" >>,
66+ {ok , _ } = rabbitmq_amqp_client :declare_queue (LinkPair , QName , #{}),
67+ ok = rabbitmq_amqp_client :bind_queue (LinkPair , QName , <<" amq.topic" >>, <<" topic.1" >>, #{}),
68+ ok = rabbitmq_amqp_client :detach_management_link_pair_sync (LinkPair ),
69+ {ok , Receiver } = amqp10_client :attach_receiver_link (
70+ Session1 , <<" test-receiver" >>,
71+ rabbitmq_amqp_address :queue (QName ),
72+ unsettled , configuration ),
73+
74+ C = connect (ClientId , Config ),
75+ Correlation = <<" some correlation ID" >>,
76+ ContentType = <<" text/plain" >>,
77+ RequestPayload = <<" my request" >>,
78+ {ok , _ } = emqtt :publish (C , <<" topic/1" >>,
79+ #{'Content-Type' => ContentType ,
80+ 'Correlation-Data' => Correlation },
81+ RequestPayload , [{qos , 1 }]),
82+
83+ {ok , Msg1 } = amqp10_client :get_msg (Receiver ),
84+ Props = amqp10_msg :message_annotations (Msg1 ),
85+ ? assertMatch (ClientId , maps :get (<<" x-client_id" >>, Props )),
86+
87+ % With AMQP 0.9
5488 Ch = rabbit_ct_client_helpers :open_channel (Config ),
5589
56- QQ = <<" qq1" >>,
57- Topic = <<" mytopic" >>,
58-
59- declare_queue (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
60- bind (Ch , QQ , Topic ),
61-
62- ClientId = ? FUNCTION_NAME ,
63- C = connect (ClientId , Config , [{max_inflight , 200 },
64- {retry_interval , 2 }]),
65-
66- ? _assertMatch ({ok , _ }, emqtt :publish (C , Topic , <<" 1" >>, [{qos , 1 }])),
6790 ? _assertMatch ({# 'basic.get_ok' {},
6891 # amqp_msg {props = # 'P_basic' {headers = [{<<" x-client_id" >>,
6992 longstr ,
7093 <<" incoming" >>}]} }},
71- amqp_channel :call (Ch , # 'basic.get' {queue = QQ , no_ack = true })),
72- OpnConf = connection_config (Config ),
73- {ok , Connection } = amqp10_client :open_connection (OpnConf ),
74- {ok , Session } = amqp10_client :begin_session (Connection ),
75- {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" test-receiver" >>, <<" qq1" >>),
76-
77- receive M ->
78- ct :log (" Received message: ~p " , [M ]),
79- ok
80- after 5000 ->
81- ct :log (" Timeout waiting for message" )
82- end ,
94+ amqp_channel :call (Ch , # 'basic.get' {queue = QName , no_ack = true })),
8395
84- % % grant some credit to the remote sender but don't auto-renew it
85- ok = amqp10_client :flow_link_credit (Receiver , 5 , never ),
86-
87- % % wait for a delivery
88- receive
89- {amqp10_msg , Receiver , InMsg } ->
90- ct :log (" Received message: ~p " , [InMsg ]),
91- ok
92- after 2000 ->
93- exit (delivery_timeout )
94- end .
95-
96-
97- declare_queue (Ch , QueueName , Args )
98- when is_pid (Ch ), is_binary (QueueName ), is_list (Args ) ->
99- # 'queue.declare_ok' {} = amqp_channel :call (
100- Ch , # 'queue.declare' {
101- queue = QueueName ,
102- durable = true ,
103- arguments = Args }).
104-
105- bind (Ch , QueueName , Topic )
106- when is_pid (Ch ), is_binary (QueueName ), is_binary (Topic ) ->
107- # 'queue.bind_ok' {} = amqp_channel :call (
108- Ch , # 'queue.bind' {queue = QueueName ,
109- exchange = <<" amq.topic" >>,
110- routing_key = Topic }).
111-
112- connection_config (Config ) ->
113- Host = ? config (rmq_hostname , Config ),
114- Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
115- #{address => Host ,
116- port => Port ,
117- container_id => <<" my container" >>,
118- sasl => {plain , <<" guest" >>, <<" guest" >>}}.
96+ rabbit_ct_client_helpers :close_channel (Ch ),
97+ emqtt :disconnect (C ).
0 commit comments