Skip to content

Commit 9aead0d

Browse files
committed
Add test case enable_ff
1 parent 01789db commit 9aead0d

File tree

1 file changed

+146
-14
lines changed

1 file changed

+146
-14
lines changed

deps/rabbit/test/direct_reply_to_amqpl_SUITE.erl

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,28 +33,42 @@
3333
all() ->
3434
[
3535
{group, cluster_size_1},
36+
{group, cluster_size_1_ff_disabled},
3637
{group, cluster_size_3}
3738
].
3839

3940
groups() ->
4041
[
4142
{cluster_size_1, [shuffle],
43+
cluster_size_1_common() ++
4244
[
43-
trace,
44-
failure_ack_mode,
45-
failure_multiple_consumers,
46-
failure_reuse_consumer_tag,
47-
failure_publish,
4845
amqpl_amqp_amqpl,
4946
amqp_amqpl_amqp
5047
]},
48+
49+
%% Delete this group when feature flag rabbitmq_4.2.0 becomes required.
50+
{cluster_size_1_ff_disabled, [],
51+
cluster_size_1_common() ++
52+
[
53+
enable_ff % must run last
54+
]},
55+
5156
{cluster_size_3, [shuffle],
5257
[
5358
rpc_new_to_old_node,
5459
rpc_old_to_new_node
5560
]}
5661
].
5762

63+
cluster_size_1_common() ->
64+
[
65+
trace,
66+
failure_ack_mode,
67+
failure_multiple_consumers,
68+
failure_reuse_consumer_tag,
69+
failure_publish
70+
].
71+
5872
%% -------------------------------------------------------------------
5973
%% Testsuite setup/teardown.
6074
%% -------------------------------------------------------------------
@@ -69,15 +83,27 @@ end_per_suite(Config) ->
6983

7084
init_per_group(Group, Config) ->
7185
Nodes = case Group of
72-
cluster_size_1 -> 1;
73-
cluster_size_3 -> 3
86+
cluster_size_3 ->
87+
3;
88+
_ ->
89+
1
7490
end,
75-
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
76-
Config1 = rabbit_ct_helpers:set_config(
77-
Config, [{rmq_nodes_count, Nodes},
78-
{rmq_nodename_suffix, Suffix}]),
91+
Config1 = case Group of
92+
cluster_size_1_ff_disabled ->
93+
rabbit_ct_helpers:merge_app_env(
94+
Config,
95+
{rabbit,
96+
[{forced_feature_flags_on_init,
97+
{rel, [], ['rabbitmq_4.2.0']}}]});
98+
_ ->
99+
Config
100+
end,
101+
Suffix = rabbit_ct_helpers:testcase_absname(Config1, "", "-"),
102+
Config2 = rabbit_ct_helpers:set_config(
103+
Config1, [{rmq_nodes_count, Nodes},
104+
{rmq_nodename_suffix, Suffix}]),
79105
rabbit_ct_helpers:run_setup_steps(
80-
Config1,
106+
Config2,
81107
rabbit_ct_broker_helpers:setup_steps() ++
82108
rabbit_ct_client_helpers:setup_steps()).
83109

@@ -93,6 +119,114 @@ init_per_testcase(Testcase, Config) ->
93119
end_per_testcase(Testcase, Config) ->
94120
rabbit_ct_helpers:testcase_finished(Config, Testcase).
95121

122+
%% Test enabling the feature flag while a client consumes from the volatile queue.
123+
%% Delete this test case when feature flag rabbitmq_4.2.0 becomes required.
124+
enable_ff(Config) ->
125+
RequestQueue = <<"request queue">>,
126+
RequestPayload = <<"my request">>,
127+
CorrelationId = <<"my correlation ID">>,
128+
RequesterCh = rabbit_ct_client_helpers:open_channel(Config),
129+
ResponderCh = rabbit_ct_client_helpers:open_channel(Config),
130+
131+
amqp_channel:subscribe(RequesterCh,
132+
#'basic.consume'{queue = ?REPLY_QUEUE,
133+
no_ack = true},
134+
self()),
135+
CTag = receive #'basic.consume_ok'{consumer_tag = CTag0} -> CTag0
136+
end,
137+
138+
#'queue.declare_ok'{} = amqp_channel:call(
139+
RequesterCh,
140+
#'queue.declare'{queue = RequestQueue}),
141+
#'confirm.select_ok'{} = amqp_channel:call(RequesterCh, #'confirm.select'{}),
142+
amqp_channel:register_confirm_handler(RequesterCh, self()),
143+
144+
%% Send the request.
145+
amqp_channel:cast(
146+
RequesterCh,
147+
#'basic.publish'{routing_key = RequestQueue},
148+
#amqp_msg{props = #'P_basic'{reply_to = ?REPLY_QUEUE,
149+
correlation_id = CorrelationId},
150+
payload = RequestPayload}),
151+
receive #'basic.ack'{} -> ok
152+
end,
153+
154+
%% Receive the request.
155+
{#'basic.get_ok'{},
156+
#amqp_msg{props = #'P_basic'{reply_to = ReplyTo,
157+
correlation_id = CorrelationId},
158+
payload = RequestPayload}
159+
} = amqp_channel:call(ResponderCh, #'basic.get'{queue = RequestQueue}),
160+
161+
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
162+
message_count = 0,
163+
consumer_count = 1},
164+
amqp_channel:call(ResponderCh,
165+
#'queue.declare'{queue = ReplyTo})),
166+
167+
%% Send the first reply.
168+
amqp_channel:cast(
169+
ResponderCh,
170+
#'basic.publish'{routing_key = ReplyTo},
171+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
172+
payload = <<"reply 1">>}),
173+
174+
%% Receive the frst reply.
175+
receive {#'basic.deliver'{consumer_tag = CTag,
176+
redelivered = false,
177+
exchange = <<>>,
178+
routing_key = ReplyTo},
179+
#amqp_msg{payload = P1,
180+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
181+
?assertEqual(<<"reply 1">>, P1)
182+
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
183+
end,
184+
185+
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'),
186+
187+
?assertEqual(#'queue.declare_ok'{queue = ReplyTo,
188+
message_count = 0,
189+
consumer_count = 1},
190+
amqp_channel:call(ResponderCh,
191+
#'queue.declare'{queue = ReplyTo})),
192+
193+
%% Send the second reply.
194+
amqp_channel:cast(
195+
ResponderCh,
196+
#'basic.publish'{routing_key = ReplyTo},
197+
#amqp_msg{props = #'P_basic'{correlation_id = CorrelationId},
198+
payload = <<"reply 2">>}),
199+
200+
%% Receive the second reply.
201+
receive {#'basic.deliver'{consumer_tag = CTag},
202+
#amqp_msg{payload = P2,
203+
props = #'P_basic'{correlation_id = CorrelationId}}} ->
204+
?assertEqual(<<"reply 2">>, P2)
205+
after ?TIMEOUT -> ct:fail({missing_reply, ?LINE})
206+
end,
207+
208+
%% Requester cancels consumption.
209+
?assertMatch(#'basic.cancel_ok'{consumer_tag = CTag},
210+
amqp_channel:call(RequesterCh, #'basic.cancel'{consumer_tag = CTag})),
211+
212+
%% Responder checks again if the requester is still there.
213+
%% This time, the requester and its queue should be gone.
214+
try amqp_channel:call(ResponderCh, #'queue.declare'{queue = ReplyTo}) of
215+
_ ->
216+
ct:fail("expected queue.declare to fail")
217+
catch exit:Reason ->
218+
?assertMatch(
219+
{{_, {_, _, <<"NOT_FOUND - no queue '",
220+
ReplyTo:(byte_size(ReplyTo))/binary,
221+
"' in vhost '/'">>}}, _},
222+
Reason)
223+
end,
224+
225+
%% Clean up.
226+
#'queue.delete_ok'{} = amqp_channel:call(RequesterCh,
227+
#'queue.delete'{queue = RequestQueue}),
228+
ok = rabbit_ct_client_helpers:close_channel(RequesterCh).
229+
96230
%% Test case for
97231
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11662
98232
trace(Config) ->
@@ -373,8 +507,6 @@ amqpl_amqp_amqpl(Config) ->
373507
%% Test that Direct Reply-To works when the requester is an AMQP 1.0 client
374508
%% and the responder is an AMQP 0.9.1 client.
375509
amqp_amqpl_amqp(Config) ->
376-
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0'),
377-
378510
RequestQ = atom_to_binary(?FUNCTION_NAME),
379511
AddrRequestQ = rabbitmq_amqp_address:queue(RequestQ),
380512
Id = <<"🥕"/utf8>>,

0 commit comments

Comments
 (0)