1616
1717all () ->
1818 [
19+ {group , cluster_size_1 },
1920 {group , cluster_size_3 }
2021 ].
2122
2223groups () ->
2324 [
25+ {cluster_size_1 , [shuffle ],
26+ [
27+ trace
28+ ]},
2429 {cluster_size_3 , [shuffle ],
2530 [
2631 rpc_new_to_old_node ,
@@ -39,8 +44,11 @@ init_per_suite(Config) ->
3944end_per_suite (Config ) ->
4045 Config .
4146
42- init_per_group (_Group , Config ) ->
43- Nodes = 3 ,
47+ init_per_group (Group , Config ) ->
48+ Nodes = case Group of
49+ cluster_size_1 -> 1 ;
50+ cluster_size_3 -> 3
51+ end ,
4452 Suffix = rabbit_ct_helpers :testcase_absname (Config , " " , " -" ),
4553 Config1 = rabbit_ct_helpers :set_config (
4654 Config , [{rmq_nodes_count , Nodes },
@@ -62,6 +70,114 @@ init_per_testcase(Testcase, Config) ->
6270end_per_testcase (Testcase , Config ) ->
6371 rabbit_ct_helpers :testcase_finished (Config , Testcase ).
6472
73+ % % Test case for
74+ % % https://github.com/rabbitmq/rabbitmq-server/discussions/11662
75+ trace (Config ) ->
76+ {ok , _ } = rabbit_ct_broker_helpers :rabbitmqctl (Config , 0 , [" trace_on" ]),
77+
78+ Node = atom_to_binary (rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename )),
79+ TraceQueue = <<" tests.amqpl_direct_reply_to.trace.tracing" >>,
80+ RequestQueue = <<" tests.amqpl_direct_reply_to.trace.requests" >>,
81+ % % This is the pseudo queue that is specially interpreted by RabbitMQ.
82+ ReplyQueue = <<" amq.rabbitmq.reply-to" >>,
83+ RequestPayload = <<" my request" >>,
84+ ReplyPayload = <<" my reply" >>,
85+ CorrelationId = <<" my correlation ID" >>,
86+ Qs = [RequestQueue , TraceQueue ],
87+ Ch = rabbit_ct_client_helpers :open_channel (Config ),
88+ RequesterCh = rabbit_ct_client_helpers :open_channel (Config , 0 ),
89+ ResponderCh = rabbit_ct_client_helpers :open_channel (Config , 0 ),
90+
91+ [# 'queue.declare_ok' {} = amqp_channel :call (Ch , # 'queue.declare' {queue = Q0 }) || Q0 <- Qs ],
92+ # 'queue.bind_ok' {} = amqp_channel :call (
93+ Ch , # 'queue.bind' {
94+ queue = TraceQueue ,
95+ exchange = <<" amq.rabbitmq.trace" >>,
96+ % % We subscribe only to messages entering RabbitMQ.
97+ routing_key = <<" publish.#" >>}),
98+
99+ % % There is no need to declare this pseudo queue first.
100+ amqp_channel :subscribe (RequesterCh ,
101+ # 'basic.consume' {queue = ReplyQueue ,
102+ no_ack = true },
103+ self ()),
104+ CTag = receive # 'basic.consume_ok' {consumer_tag = CTag0 } -> CTag0
105+ end ,
106+ # 'confirm.select_ok' {} = amqp_channel :call (RequesterCh , # 'confirm.select' {}),
107+ amqp_channel :register_confirm_handler (RequesterCh , self ()),
108+
109+ % % Send the request.
110+ amqp_channel :cast (
111+ RequesterCh ,
112+ # 'basic.publish' {routing_key = RequestQueue },
113+ # amqp_msg {props = # 'P_basic' {reply_to = ReplyQueue ,
114+ correlation_id = CorrelationId },
115+ payload = RequestPayload }),
116+ receive # 'basic.ack' {} -> ok
117+ after 5000 -> ct :fail (confirm_timeout )
118+ end ,
119+
120+ % % Receive the request.
121+ {# 'basic.get_ok' {},
122+ # amqp_msg {props = # 'P_basic' {reply_to = ReplyTo ,
123+ correlation_id = CorrelationId },
124+ payload = RequestPayload }
125+ } = amqp_channel :call (ResponderCh , # 'basic.get' {queue = RequestQueue }),
126+
127+ % % Send the reply.
128+ amqp_channel :cast (
129+ ResponderCh ,
130+ # 'basic.publish' {routing_key = ReplyTo },
131+ # amqp_msg {props = # 'P_basic' {correlation_id = CorrelationId },
132+ payload = ReplyPayload }),
133+
134+ % % Receive the reply.
135+ receive {# 'basic.deliver' {consumer_tag = CTag },
136+ # amqp_msg {payload = ReplyPayload ,
137+ props = # 'P_basic' {correlation_id = CorrelationId }}} ->
138+ ok
139+ after 5000 -> ct :fail (missing_reply )
140+ end ,
141+
142+ % % 2 messages should have entered RabbitMQ:
143+ % % 1. the RPC request
144+ % % 2. the RPC reply
145+
146+ {# 'basic.get_ok' {routing_key = <<" publish." >>},
147+ # amqp_msg {props = # 'P_basic' {headers = RequestHeaders },
148+ payload = RequestPayload }
149+ } = amqp_channel :call (Ch , # 'basic.get' {queue = TraceQueue }),
150+ ? assertMatch (#{
151+ <<" exchange_name" >> := <<>>,
152+ <<" routing_keys" >> := [RequestQueue ],
153+ <<" connection" >> := <<" 127.0.0.1:" , _ /binary >>,
154+ <<" node" >> := Node ,
155+ <<" vhost" >> := <<" /" >>,
156+ <<" user" >> := <<" guest" >>,
157+ <<" properties" >> := #{<<" correlation_id" >> := CorrelationId },
158+ <<" routed_queues" >> := [RequestQueue ]
159+ },
160+ rabbit_misc :amqp_table (RequestHeaders )),
161+
162+ {# 'basic.get_ok' {routing_key = <<" publish." >>},
163+ # amqp_msg {props = # 'P_basic' {headers = ResponseHeaders },
164+ payload = ReplyPayload }
165+ } = amqp_channel :call (Ch , # 'basic.get' {queue = TraceQueue }),
166+ ? assertMatch (#{
167+ <<" exchange_name" >> := <<>>,
168+ <<" routing_keys" >> := [<<" amq.rabbitmq.reply-to." , _ /binary >>],
169+ <<" connection" >> := <<" 127.0.0.1:" , _ /binary >>,
170+ <<" node" >> := Node ,
171+ <<" vhost" >> := <<" /" >>,
172+ <<" user" >> := <<" guest" >>,
173+ <<" properties" >> := #{<<" correlation_id" >> := CorrelationId },
174+ <<" routed_queues" >> := [<<" amq.rabbitmq.reply-to." , _ /binary >>]
175+ },
176+ rabbit_misc :amqp_table (ResponseHeaders )),
177+
178+ [# 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = Q0 }) || Q0 <- Qs ],
179+ {ok , _ } = rabbit_ct_broker_helpers :rabbitmqctl (Config , 0 , [" trace_off" ]).
180+
65181% % "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
66182rpc_new_to_old_node (Config ) ->
67183 rpc (0 , 1 , Config ).
@@ -70,7 +186,7 @@ rpc_old_to_new_node(Config) ->
70186 rpc (1 , 0 , Config ).
71187
72188rpc (RequesterNode , ResponderNode , Config ) ->
73- RequestQueue = <<" my request queue " >>,
189+ RequestQueue = <<" tests.amqpl_direct_reply_to.rpc.requests " >>,
74190 % % This is the pseudo queue that is specially interpreted by RabbitMQ.
75191 ReplyQueue = <<" amq.rabbitmq.reply-to" >>,
76192 RequestPayload = <<" my request" >>,
0 commit comments