Skip to content

Commit 0ee1c66

Browse files
Merge branch 'shovel-tests'
2 parents a5d95e5 + 1471f23 commit 0ee1c66

File tree

7 files changed

+183
-208
lines changed

7 files changed

+183
-208
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ init_source(State = #{source := #{current := #{link := Link},
148148
#{delete_after := Rem} -> Rem;
149149
_ -> unlimited
150150
end,
151+
case Remaining of
152+
0 -> exit({shutdown, autodelete});
153+
_ -> ok
154+
end,
151155
State#{source => Src#{remaining => Remaining,
152156
remaining_unacked => Remaining,
153157
last_acked_tag => -1}}.
@@ -302,12 +306,12 @@ ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) ->
302306
-spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state().
303307
nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) ->
304308
% the tag is the same as the deliveryid
305-
ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected),
309+
ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, released),
306310
State#{source => Src#{last_nacked_tag => Tag}};
307311
nack(Tag, true, State = #{source := #{current := #{link := LinkRef},
308312
last_nacked_tag := LastTag} = Src}) ->
309313
First = LastTag + 1,
310-
ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected),
314+
ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, released),
311315
State#{source => Src#{last_nacked_tag => Tag}}.
312316

313317
status(#{dest := #{current := #{link_state := attached}}}) ->

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,9 +388,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
388388
on_confirm when length(Queues) > 0 ->
389389
State2;
390390
on_publish ->
391-
decr_remaining(
392-
1,
393-
record_confirms([{Tag, Tag}], State2));
391+
record_confirms([{Tag, Tag}], State2);
394392
_ ->
395393
decr_remaining(1, State2)
396394
end),

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ internal_owner(Def) ->
113113
validate_src(Def) ->
114114
case protocols(Def) of
115115
{amqp091, _} -> validate_amqp091_src(Def);
116-
{amqp10, _} -> [];
116+
{amqp10, _} -> validate_amqp10_src(Def);
117117
{local, _} -> validate_local_src(Def)
118118
end.
119119

@@ -137,6 +137,14 @@ validate_amqp091_src(Def) ->
137137
ok
138138
end].
139139

140+
validate_amqp10_src(Def) ->
141+
[case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of
142+
{N, <<"no-ack">>} when is_integer(N) ->
143+
{error, "Cannot specify 'no-ack' and numerical 'delete-after'", []};
144+
_ ->
145+
ok
146+
end].
147+
140148
validate_local_src(Def) ->
141149
[case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of
142150
zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []};

deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ groups() ->
4040
restart,
4141
change_definition,
4242
autodelete,
43-
autodelete_with_rejections,
4443
validation,
4544
security_validation,
4645
get_connection_name,
@@ -490,11 +489,8 @@ change_definition(Config) ->
490489
end).
491490

492491
autodelete(Config) ->
493-
autodelete_case(Config, {<<"on-confirm">>, 0, 100, 0}),
494-
autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50}),
495492
autodelete_case(Config, {<<"on-confirm">>, <<"queue-length">>, 0, 100}),
496493
autodelete_case(Config, {<<"on-publish">>, <<"queue-length">>, 0, 100}),
497-
autodelete_case(Config, {<<"on-publish">>, 50, 50, 50}),
498494
%% no-ack is not compatible with explicit count
499495
autodelete_case(Config, {<<"no-ack">>, <<"queue-length">>, 0, 100}),
500496
ok.
@@ -520,35 +516,6 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
520516
expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc)
521517
end.
522518

523-
autodelete_with_rejections(Config) ->
524-
Src = <<"src">>,
525-
Dest = <<"dst">>,
526-
Args = [{<<"x-max-length">>, long, 5},
527-
{<<"x-overflow">>, longstr, <<"reject-publish">>}],
528-
with_ch(Config,
529-
fun (Ch) ->
530-
amqp_channel:call(Ch, #'queue.declare'{queue = Dest,
531-
durable = true,
532-
arguments = Args}),
533-
shovel_test_utils:set_param(Config, <<"test">>,
534-
[{<<"src-protocol">>, <<"local">>},
535-
{<<"src-queue">>, Src},
536-
{<<"src-delete-after">>, 10},
537-
{<<"dest-protocol">>, <<"local">>},
538-
{<<"dest-predeclared">>, true},
539-
{<<"dest-queue">>, Dest}
540-
]),
541-
publish_count(Ch, <<>>, Src, <<"hello">>, 10),
542-
await_autodelete(Config, <<"test">>),
543-
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
544-
eventually(
545-
?_assertMatch(
546-
Expected,
547-
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
548-
Config, 0,
549-
["list_queues", "name", "messages_ready", "--no-table-headers"]))))
550-
end).
551-
552519
validation(Config) ->
553520
URIs = [{<<"src-uri">>, <<"amqp://">>},
554521
{<<"dest-uri">>, <<"amqp://">>}],

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,6 @@ groups() ->
3333
{non_parallel_tests, [], [
3434
simple,
3535
change_definition,
36-
autodelete_amqp091_src_on_confirm,
37-
autodelete_amqp091_src_on_publish,
38-
autodelete_amqp091_dest_on_confirm,
39-
autodelete_amqp091_dest_on_publish,
40-
autodelete_with_rejections,
4136
simple_amqp10_dest,
4237
simple_amqp10_src,
4338
amqp091_to_amqp10_with_dead_lettering,
@@ -270,117 +265,6 @@ change_definition(Config) ->
270265
amqp10_expect_empty(Sess, Dest2)
271266
end).
272267

273-
autodelete_amqp091_src_on_confirm(Config) ->
274-
autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50},
275-
fun autodelete_amqp091_src/2),
276-
ok.
277-
278-
autodelete_amqp091_src_on_publish(Config) ->
279-
autodelete_case(Config, {<<"on-publish">>, 50, 50, 50},
280-
fun autodelete_amqp091_src/2),
281-
ok.
282-
283-
autodelete_amqp091_dest_on_confirm(Config) ->
284-
autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50},
285-
fun autodelete_amqp091_dest/2),
286-
ok.
287-
288-
autodelete_amqp091_dest_on_publish(Config) ->
289-
autodelete_case(Config, {<<"on-publish">>, 50, 50, 50},
290-
fun autodelete_amqp091_dest/2),
291-
ok.
292-
293-
autodelete_case(Config, Args, CaseFun) ->
294-
with_amqp10_session(Config, CaseFun(Config, Args)).
295-
296-
autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) ->
297-
Src = ?config(srcq, Config),
298-
Dest = ?config(destq, Config),
299-
fun (Session) ->
300-
amqp10_publish(Session, Src, <<"hello">>, 100),
301-
shovel_test_utils:set_param_nowait(
302-
Config,
303-
?PARAM, [{<<"src-address">>, Src},
304-
{<<"src-protocol">>, <<"amqp10">>},
305-
{<<"src-delete-after">>, After},
306-
{<<"src-prefetch-count">>, 5},
307-
{<<"dest-address">>, Dest},
308-
{<<"dest-protocol">>, <<"amqp10">>},
309-
{<<"ack-mode">>, AckMode}
310-
]),
311-
await_autodelete(Config, <<"test">>),
312-
amqp10_expect_count(Session, Dest, ExpDest),
313-
amqp10_expect_count(Session, Src, ExpSrc)
314-
end.
315-
316-
autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) ->
317-
Src = ?config(srcq, Config),
318-
Dest = ?config(destq, Config),
319-
fun (Session) ->
320-
amqp10_publish(Session, Src, <<"hello">>, 100),
321-
shovel_test_utils:set_param_nowait(
322-
Config,
323-
?PARAM, [{<<"src-queue">>, Src},
324-
{<<"src-protocol">>, <<"amqp091">>},
325-
{<<"src-delete-after">>, After},
326-
{<<"src-prefetch-count">>, 5},
327-
{<<"dest-address">>, Dest},
328-
{<<"dest-protocol">>, <<"amqp10">>},
329-
{<<"ack-mode">>, AckMode}
330-
]),
331-
await_autodelete(Config, <<"test">>),
332-
amqp10_expect_count(Session, Dest, ExpDest),
333-
amqp10_expect_count(Session, Src, ExpSrc)
334-
end.
335-
336-
autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) ->
337-
Src = ?config(srcq, Config),
338-
Dest = ?config(destq, Config),
339-
fun (Session) ->
340-
amqp10_publish(Session, Src, <<"hello">>, 100),
341-
shovel_test_utils:set_param_nowait(
342-
Config,
343-
?PARAM, [{<<"src-address">>, Src},
344-
{<<"src-protocol">>, <<"amqp10">>},
345-
{<<"src-delete-after">>, After},
346-
{<<"src-prefetch-count">>, 5},
347-
{<<"dest-queue">>, Dest},
348-
{<<"dest-protocol">>, <<"amqp091">>},
349-
{<<"ack-mode">>, AckMode}
350-
]),
351-
await_autodelete(Config, <<"test">>),
352-
amqp10_expect_count(Session, Dest, ExpDest),
353-
amqp10_expect_count(Session, Src, ExpSrc)
354-
end.
355-
356-
autodelete_with_rejections(Config) ->
357-
Src = ?config(srcq, Config),
358-
Dest = ?config(destq, Config),
359-
with_amqp10_session(
360-
Config,
361-
fun (Sess) ->
362-
amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5},
363-
<<"x-overflow">> => {utf8, <<"reject-publish">>}}),
364-
365-
shovel_test_utils:set_param(Config, ?PARAM,
366-
[{<<"src-protocol">>, <<"local">>},
367-
{<<"src-queue">>, Src},
368-
{<<"src-delete-after">>, 10},
369-
{<<"dest-protocol">>, <<"local">>},
370-
{<<"dest-predeclared">>, true},
371-
{<<"dest-queue">>, Dest}
372-
]),
373-
amqp10_publish(Sess, Src, <<"hello">>, 10),
374-
await_autodelete(Config, <<"test">>),
375-
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
376-
?awaitMatch(
377-
Expected,
378-
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
379-
Config, 0,
380-
["list_queues", "name", "messages_ready", "--no-table-headers"])),
381-
30_000)
382-
end).
383-
384268
test_amqp10_delete_after_queue_length(Config) ->
385269
Src = ?config(srcq, Config),
386270
Dest = ?config(destq, Config),

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ groups() ->
5353
local_to_local_delete_after_never,
5454
local_to_local_delete_after_queue_length,
5555
local_to_local_delete_after_queue_length_zero,
56-
local_to_local_delete_after_number,
57-
local_to_local_delete_after_with_rejections,
5856
local_to_local_no_ack,
5957
local_to_local_quorum_no_ack,
6058
local_to_local_stream_no_ack,
@@ -560,54 +558,6 @@ local_to_local_delete_after_queue_length(Config) ->
560558
amqp10_expect_empty(Sess, Dest)
561559
end).
562560

563-
local_to_local_delete_after_number(Config) ->
564-
Src = ?config(srcq, Config),
565-
Dest = ?config(destq, Config),
566-
with_amqp10_session(Config,
567-
fun (Sess) ->
568-
amqp10_publish(Sess, Src, <<"tag1">>, 5),
569-
shovel_test_utils:set_param(Config, ?PARAM,
570-
[{<<"src-protocol">>, <<"local">>},
571-
{<<"src-queue">>, Src},
572-
{<<"src-delete-after">>, 10},
573-
{<<"dest-protocol">>, <<"local">>},
574-
{<<"dest-queue">>, Dest}
575-
]),
576-
amqp10_expect_count(Sess, Dest, 5),
577-
amqp10_publish(Sess, Src, <<"tag1">>, 10),
578-
amqp10_expect_count(Sess, Dest, 5),
579-
await_autodelete(Config, ?PARAM),
580-
amqp10_expect_empty(Sess, Dest)
581-
end).
582-
583-
local_to_local_delete_after_with_rejections(Config) ->
584-
Src = ?config(srcq, Config),
585-
Dest = ?config(destq, Config),
586-
VHost = <<"/">>,
587-
declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5},
588-
{<<"x-overflow">>, longstr, <<"reject-publish">>}]),
589-
with_amqp10_session(Config,
590-
fun (Sess) ->
591-
shovel_test_utils:set_param(Config, ?PARAM,
592-
[{<<"src-protocol">>, <<"local">>},
593-
{<<"src-queue">>, Src},
594-
{<<"src-delete-after">>, 10},
595-
{<<"dest-protocol">>, <<"local">>},
596-
{<<"dest-predeclared">>, true},
597-
{<<"dest-queue">>, Dest}
598-
]),
599-
amqp10_publish(Sess, Src, <<"tag1">>, 10),
600-
?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000),
601-
Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]),
602-
?awaitMatch(
603-
Expected,
604-
lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
605-
Config, 0,
606-
["list_queues", "name", "messages_ready", "--no-table-headers"])),
607-
30_000)
608-
609-
end).
610-
611561
local_to_local_no_ack(Config) ->
612562
Src = ?config(srcq, Config),
613563
Dest = ?config(destq, Config),

0 commit comments

Comments
 (0)