Skip to content

Commit 2bc033a

Browse files
committed
Add test case enable_ff
1 parent 12e3fcc commit 2bc033a

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

deps/rabbit/test/direct_reply_to_amqpl_SUITE.erl

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ groups() ->
4040
[
4141
{cluster_size_1, [shuffle],
4242
[
43+
enable_ff,
4344
trace,
4445
failure_ack_mode,
4546
failure_multiple_consumers,
@@ -93,6 +94,115 @@ init_per_testcase(Testcase, Config) ->
9394
end_per_testcase(Testcase, Config) ->
9495
rabbit_ct_helpers:testcase_finished(Config, Testcase).
9596

97+
%% In mixed version mode, this test case tests enabling the feature flag
98+
%% while a client consumes from the volatile queue.
99+
%% Delete this test case when feature flag rabbitmq_4.2.0 becomes required.
100+
enable_ff(Config) ->
101+
RequestQueue = <<"request queue">>,
102+
RequestPayload = <<"my request">>,
103+
CorrelationId = <<"my correlation ID">>,
104+
RequesterCh = rabbit_ct_client_helpers:open_channel(Config),
105+
ResponderCh = rabbit_ct_client_helpers:open_channel(Config),
106+
107+
amqp_channel:subscribe(RequesterCh,
108+
#'basic.consume'{queue = ?REPLY_QUEUE,
109+
no_ack = true},
110+
self()),
111+
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
112+
end,
113+
114+
#'queue.declare_ok'{} = amqp_channel:call(
115+
RequesterCh,
116+
#'queue.declare'{queue = RequestQueue}),
117+
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
118+
amqp_channel:register_confirm_handler(RequesterCh, self()),
119+
120+
%% Send the request.
121+
amqp_channel:cast(
122+
RequesterCh,
123+
#'basic.publish'{routing_key = RequestQueue},
124+
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
125+
correlation_id = CorrelationId},
126+
payload = RequestPayload}),
127+
receive #'basic.ack'{} -> ok
128+
end,
129+
130+
%% Receive the request.
131+
{#'basic.get_ok'{},
132+
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
133+
correlation_id = CorrelationId},
134+
payload = RequestPayload}
135+
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
136+
137+
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
138+
message_count = 0,
139+
consumer_count = 1},
140+
amqp_channel:call(ResponderCh,
141+
#'queue.declare'{queue = ReplyTo})),
142+
143+
%% Send the first reply.
144+
amqp_channel:cast(
145+
ResponderCh,
146+
#'basic.publish'{routing_key = ReplyTo},
147+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
148+
payload = <<"reply 1">>}),
149+
150+
%% Receive the frst reply.
151+
receive {#'basic.deliver'{consumer_tag = CTag,
152+
redelivered = false,
153+
exchange = <<>>,
154+
routing_key = ReplyTo},
155+
#amqp_msg{payload = P1,
156+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
157+
?assertEqual(<<"reply 1">>, P1)
158+
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
159+
end,
160+
161+
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'),
162+
163+
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
164+
message_count = 0,
165+
consumer_count = 1},
166+
amqp_channel:call(ResponderCh,
167+
#'queue.declare'{queue = ReplyTo})),
168+
169+
%% Send the second reply.
170+
amqp_channel:cast(
171+
ResponderCh,
172+
#'basic.publish'{routing_key = ReplyTo},
173+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
174+
payload = <<"reply 2">>}),
175+
176+
%% Receive the second reply.
177+
receive {#'basic.deliver'{consumer_tag = CTag},
178+
#amqp_msg{payload = P2,
179+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
180+
?assertEqual(<<"reply 2">>, P2)
181+
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
182+
end,
183+
184+
%% Requester cancels consumption.
185+
?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
186+
amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})),
187+
188+
%% Responder checks again if the requester is still there.
189+
%% This time, the requester and its queue should be gone.
190+
try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of
191+
_ ->
192+
ct:fail("expected queue.declare to fail")
193+
catch exit:Reason ->
194+
?assertMatch(
195+
{{_, {_, _, <<"NOT_FOUND - no queue '",
196+
ReplyTo:(byte_size(ReplyTo))/binary,
197+
"' in vhost '/'">>}}, _},
198+
Reason)
199+
end,
200+
201+
%% Clean up.
202+
#'queue.delete_ok'{} = amqp_channel:call(RequesterCh,
203+
#'queue.delete'{queue = RequestQueue}),
204+
ok = rabbit_ct_client_helpers:close_channel(RequesterCh).
205+
96206
%% Test case for
97207
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11662
98208
trace(Config) ->

0 commit comments

Comments
 (0)