diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index 4644fc35561a..b9a5f339f485 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -148,6 +148,10 @@ init_source(State = #{source := #{current := #{link := Link}, #{delete_after := Rem} -> Rem; _ -> unlimited end, + case Remaining of + 0 -> exit({shutdown, autodelete}); + _ -> ok + end, State#{source => Src#{remaining => Remaining, remaining_unacked => Remaining, last_acked_tag => -1}}. @@ -302,12 +306,12 @@ ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> -spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state(). nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) -> % the tag is the same as the deliveryid - ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected), + ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, released), State#{source => Src#{last_nacked_tag => Tag}}; nack(Tag, true, State = #{source := #{current := #{link := LinkRef}, last_nacked_tag := LastTag} = Src}) -> First = LastTag + 1, - ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected), + ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, released), State#{source => Src#{last_nacked_tag => Tag}}. status(#{dest := #{current := #{link_state := attached}}}) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl index a6285901afa9..9b0565d7a7b0 100644 --- a/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_local_shovel.erl @@ -400,9 +400,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} = on_confirm when length(Queues) > 0 -> State2; on_publish -> - decr_remaining( - 1, - record_confirms([{Tag, Tag}], State2)); + record_confirms([{Tag, Tag}], State2); _ -> decr_remaining(1, State2) end), diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl index 603c7f81b806..e7695b7ef45c 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl @@ -113,7 +113,7 @@ internal_owner(Def) -> validate_src(Def) -> case protocols(Def) of {amqp091, _} -> validate_amqp091_src(Def); - {amqp10, _} -> []; + {amqp10, _} -> validate_amqp10_src(Def); {local, _} -> validate_local_src(Def) end. @@ -137,6 +137,14 @@ validate_amqp091_src(Def) -> ok end]. +validate_amqp10_src(Def) -> + [case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of + {N, <<"no-ack">>} when is_integer(N) -> + {error, "Cannot specify 'no-ack' and numerical 'delete-after'", []}; + _ -> + ok + end]. + validate_local_src(Def) -> [case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []}; diff --git a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl index ae8cdf47fab3..39044fefdd37 100644 --- a/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl @@ -40,7 +40,6 @@ groups() -> restart, change_definition, autodelete, - autodelete_with_rejections, validation, security_validation, get_connection_name, @@ -490,11 +489,8 @@ change_definition(Config) -> end). autodelete(Config) -> - autodelete_case(Config, {<<"on-confirm">>, 0, 100, 0}), - autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50}), autodelete_case(Config, {<<"on-confirm">>, <<"queue-length">>, 0, 100}), autodelete_case(Config, {<<"on-publish">>, <<"queue-length">>, 0, 100}), - autodelete_case(Config, {<<"on-publish">>, 50, 50, 50}), %% no-ack is not compatible with explicit count autodelete_case(Config, {<<"no-ack">>, <<"queue-length">>, 0, 100}), ok. @@ -520,35 +516,6 @@ autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> expect_count(Ch, <<"src">>, <<"hello">>, ExpSrc) end. -autodelete_with_rejections(Config) -> - Src = <<"src">>, - Dest = <<"dst">>, - Args = [{<<"x-max-length">>, long, 5}, - {<<"x-overflow">>, longstr, <<"reject-publish">>}], - with_ch(Config, - fun (Ch) -> - amqp_channel:call(Ch, #'queue.declare'{queue = Dest, - durable = true, - arguments = Args}), - shovel_test_utils:set_param(Config, <<"test">>, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-delete-after">>, 10}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest} - ]), - publish_count(Ch, <<>>, Src, <<"hello">>, 10), - await_autodelete(Config, <<"test">>), - Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), - eventually( - ?_assertMatch( - Expected, - lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, - ["list_queues", "name", "messages_ready", "--no-table-headers"])))) - end). - validation(Config) -> URIs = [{<<"src-uri">>, <<"amqp://">>}, {<<"dest-uri">>, <<"amqp://">>}], diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index 7773cef146d3..5fdacaf41024 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -33,11 +33,6 @@ groups() -> {non_parallel_tests, [], [ simple, change_definition, - autodelete_amqp091_src_on_confirm, - autodelete_amqp091_src_on_publish, - autodelete_amqp091_dest_on_confirm, - autodelete_amqp091_dest_on_publish, - autodelete_with_rejections, simple_amqp10_dest, simple_amqp10_src, amqp091_to_amqp10_with_dead_lettering, @@ -270,117 +265,6 @@ change_definition(Config) -> amqp10_expect_empty(Sess, Dest2) end). -autodelete_amqp091_src_on_confirm(Config) -> - autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50}, - fun autodelete_amqp091_src/2), - ok. - -autodelete_amqp091_src_on_publish(Config) -> - autodelete_case(Config, {<<"on-publish">>, 50, 50, 50}, - fun autodelete_amqp091_src/2), - ok. - -autodelete_amqp091_dest_on_confirm(Config) -> - autodelete_case(Config, {<<"on-confirm">>, 50, 50, 50}, - fun autodelete_amqp091_dest/2), - ok. - -autodelete_amqp091_dest_on_publish(Config) -> - autodelete_case(Config, {<<"on-publish">>, 50, 50, 50}, - fun autodelete_amqp091_dest/2), - ok. - -autodelete_case(Config, Args, CaseFun) -> - with_amqp10_session(Config, CaseFun(Config, Args)). - -autodelete_do(Config, {AckMode, After, ExpSrc, ExpDest}) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - fun (Session) -> - amqp10_publish(Session, Src, <<"hello">>, 100), - shovel_test_utils:set_param_nowait( - Config, - ?PARAM, [{<<"src-address">>, Src}, - {<<"src-protocol">>, <<"amqp10">>}, - {<<"src-delete-after">>, After}, - {<<"src-prefetch-count">>, 5}, - {<<"dest-address">>, Dest}, - {<<"dest-protocol">>, <<"amqp10">>}, - {<<"ack-mode">>, AckMode} - ]), - await_autodelete(Config, <<"test">>), - amqp10_expect_count(Session, Dest, ExpDest), - amqp10_expect_count(Session, Src, ExpSrc) - end. - -autodelete_amqp091_src(Config, {AckMode, After, ExpSrc, ExpDest}) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - fun (Session) -> - amqp10_publish(Session, Src, <<"hello">>, 100), - shovel_test_utils:set_param_nowait( - Config, - ?PARAM, [{<<"src-queue">>, Src}, - {<<"src-protocol">>, <<"amqp091">>}, - {<<"src-delete-after">>, After}, - {<<"src-prefetch-count">>, 5}, - {<<"dest-address">>, Dest}, - {<<"dest-protocol">>, <<"amqp10">>}, - {<<"ack-mode">>, AckMode} - ]), - await_autodelete(Config, <<"test">>), - amqp10_expect_count(Session, Dest, ExpDest), - amqp10_expect_count(Session, Src, ExpSrc) - end. - -autodelete_amqp091_dest(Config, {AckMode, After, ExpSrc, ExpDest}) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - fun (Session) -> - amqp10_publish(Session, Src, <<"hello">>, 100), - shovel_test_utils:set_param_nowait( - Config, - ?PARAM, [{<<"src-address">>, Src}, - {<<"src-protocol">>, <<"amqp10">>}, - {<<"src-delete-after">>, After}, - {<<"src-prefetch-count">>, 5}, - {<<"dest-queue">>, Dest}, - {<<"dest-protocol">>, <<"amqp091">>}, - {<<"ack-mode">>, AckMode} - ]), - await_autodelete(Config, <<"test">>), - amqp10_expect_count(Session, Dest, ExpDest), - amqp10_expect_count(Session, Src, ExpSrc) - end. - -autodelete_with_rejections(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session( - Config, - fun (Sess) -> - amqp10_declare_queue(Sess, Dest, #{<<"x-max-length">> => {uint, 5}, - <<"x-overflow">> => {utf8, <<"reject-publish">>}}), - - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-delete-after">>, 10}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest} - ]), - amqp10_publish(Sess, Src, <<"hello">>, 10), - await_autodelete(Config, <<"test">>), - Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), - ?awaitMatch( - Expected, - lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, - ["list_queues", "name", "messages_ready", "--no-table-headers"])), - 30_000) - end). - test_amqp10_delete_after_queue_length(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), diff --git a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl index 0b58e45e2ea6..5b34d7c48e4c 100644 --- a/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl @@ -53,8 +53,6 @@ groups() -> local_to_local_delete_after_never, local_to_local_delete_after_queue_length, local_to_local_delete_after_queue_length_zero, - local_to_local_delete_after_number, - local_to_local_delete_after_with_rejections, local_to_local_no_ack, local_to_local_quorum_no_ack, local_to_local_stream_no_ack, @@ -567,54 +565,6 @@ local_to_local_delete_after_queue_length(Config) -> amqp10_expect_empty(Sess, Dest) end). -local_to_local_delete_after_number(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - with_amqp10_session(Config, - fun (Sess) -> - amqp10_publish(Sess, Src, <<"tag1">>, 5), - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-delete-after">>, 10}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-queue">>, Dest} - ]), - amqp10_expect_count(Sess, Dest, 5), - amqp10_publish(Sess, Src, <<"tag1">>, 10), - amqp10_expect_count(Sess, Dest, 5), - await_autodelete(Config, ?PARAM), - amqp10_expect_empty(Sess, Dest) - end). - -local_to_local_delete_after_with_rejections(Config) -> - Src = ?config(srcq, Config), - Dest = ?config(destq, Config), - VHost = <<"/">>, - declare_queue(Config, VHost, Dest, [{<<"x-max-length">>, long, 5}, - {<<"x-overflow">>, longstr, <<"reject-publish">>}]), - with_amqp10_session(Config, - fun (Sess) -> - shovel_test_utils:set_param(Config, ?PARAM, - [{<<"src-protocol">>, <<"local">>}, - {<<"src-queue">>, Src}, - {<<"src-delete-after">>, 10}, - {<<"dest-protocol">>, <<"local">>}, - {<<"dest-predeclared">>, true}, - {<<"dest-queue">>, Dest} - ]), - amqp10_publish(Sess, Src, <<"tag1">>, 10), - ?awaitMatch(not_found, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_runtime_parameters, lookup, [<<"/">>, <<"shovel">>, ?PARAM]), 30_000), - Expected = lists:sort([[Src, <<"5">>], [Dest, <<"5">>]]), - ?awaitMatch( - Expected, - lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, - ["list_queues", "name", "messages_ready", "--no-table-headers"])), - 30_000) - - end). - local_to_local_no_ack(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), diff --git a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl index 6d946f8d6739..10c8bb8f1145 100644 --- a/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl @@ -14,10 +14,16 @@ -compile(export_all). --import(shovel_test_utils, [set_param/3, +-import(rabbit_ct_helpers, [eventually/3]). +-import(shovel_test_utils, [await_autodelete/2, + set_param/3, + set_param_nowait/3, with_amqp10_session/2, amqp10_publish_expect/5, - amqp10_declare_queue/3]). + amqp10_declare_queue/3, + amqp10_publish/4, + amqp10_expect_count/3 + ]). -define(PARAM, <<"test">>). @@ -55,7 +61,20 @@ tests() -> simple_classic_on_publish, simple_quorum_no_ack, simple_quorum_on_confirm, - simple_quorum_on_publish + simple_quorum_on_publish, + autodelete_classic_on_confirm, + autodelete_quorum_on_confirm, + autodelete_classic_on_publish, + autodelete_quorum_on_publish, + autodelete_no_ack, + autodelete_classic_on_confirm_no_transfer, + autodelete_quorum_on_confirm_no_transfer, + autodelete_classic_on_publish_no_transfer, + autodelete_quorum_on_publish_no_transfer, + autodelete_classic_on_confirm_with_rejections, + autodelete_quorum_on_confirm_with_rejections, + autodelete_classic_on_publish_with_rejections, + autodelete_quorum_on_publish_with_rejections ]. %% ------------------------------------------------------------------- @@ -240,6 +259,137 @@ simple_queue_type_ack_mode(Config, Type, AckMode) -> amqp10_publish_expect(Sess, Src, Dest, <<"hello">>, 10) end). +<<<<<<< HEAD +======= +autodelete_classic_on_confirm_no_transfer(Config) -> + autodelete(Config, <<"classic">>, <<"on-confirm">>, 0, 100, 0). + +autodelete_quorum_on_confirm_no_transfer(Config) -> + autodelete(Config, <<"quorum">>, <<"on-confirm">>, 0, 100, 0). + +autodelete_classic_on_publish_no_transfer(Config) -> + autodelete(Config, <<"classic">>, <<"on-publish">>, 0, 100, 0). + +autodelete_quorum_on_publish_no_transfer(Config) -> + autodelete(Config, <<"quorum">>, <<"on-publish">>, 0, 100, 0). + +autodelete_classic_on_confirm(Config) -> + autodelete(Config, <<"classic">>, <<"on-confirm">>, 50, 50, 50). + +autodelete_quorum_on_confirm(Config) -> + autodelete(Config, <<"quorum">>, <<"on-confirm">>, 50, 50, 50). + +autodelete_classic_on_publish(Config) -> + autodelete(Config, <<"classic">>, <<"on-publish">>, 50, 50, 50). + +autodelete_quorum_on_publish(Config) -> + autodelete(Config, <<"quorum">>, <<"on-publish">>, 50, 50, 50). + +autodelete_no_ack(Config) -> + ExtraArgs = [{<<"ack-mode">>, <<"no-ack">>}, + {<<"src-delete-after">>, 100}], + ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, + Uri = shovel_test_utils:make_uri(Config, 0), + ?assertMatch({error_string, _}, + rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_runtime_parameters, set, + [<<"/">>, <<"shovel">>, ?PARAM, + [{<<"src-uri">>, Uri}, + {<<"dest-uri">>, [Uri]}] ++ ShovelArgs, + none])). + +autodelete(Config, Type, AckMode, After, ExpSrc, ExpDest) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_publish(Sess, Src, <<"hello">>, 100), + ExtraArgs = [{<<"ack-mode">>, AckMode}, + {<<"src-delete-after">>, After}], + ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, + set_param_nowait(Config, ?PARAM, ShovelArgs), + await_autodelete(Config, ?PARAM), + amqp10_expect_count(Sess, Src, ExpSrc), + amqp10_expect_count(Sess, Dest, ExpDest) + end). + +<<<<<<< HEAD +>>>>>>> d5f9ff27b (Shovel tests: tests for autodelete common to all protocols) +======= +autodelete_classic_on_confirm_with_rejections(Config) -> + autodelete_with_rejections(Config, <<"classic">>, <<"on-confirm">>, 5, 5). + +autodelete_quorum_on_confirm_with_rejections(Config) -> + ExpSrc = fun(ExpDest) -> 100 - ExpDest end, + autodelete_with_quorum_rejections(Config, <<"on-confirm">>, ExpSrc). + +autodelete_classic_on_publish_with_rejections(Config) -> + autodelete_with_rejections(Config, <<"classic">>, <<"on-publish">>, 0, 5). + +autodelete_quorum_on_publish_with_rejections(Config) -> + ExpSrc = fun(_) -> 0 end, + autodelete_with_quorum_rejections(Config, <<"on-publish">>, ExpSrc). + +autodelete_with_rejections(Config, Type, AckMode, ExpSrc, ExpDest) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}, + <<"x-max-length">> => {ulong, 5} + }), + amqp10_publish(Sess, Src, <<"hello">>, 10), + ExtraArgs = [{<<"ack-mode">>, AckMode}, + {<<"src-delete-after">>, 10}], + ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, + set_param_nowait(Config, ?PARAM, ShovelArgs), + await_autodelete(Config, ?PARAM), + Expected = lists:sort([[Src, integer_to_binary(ExpSrc)], + [Dest, integer_to_binary(ExpDest)]]), + ?awaitMatch( + Expected, + lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages", "--no-table-headers"])), + 45_000), + amqp10_expect_count(Sess, Src, ExpSrc), + amqp10_expect_count(Sess, Dest, ExpDest) + end). + +autodelete_with_quorum_rejections(Config, AckMode, ExpSrcFun) -> + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + Type = <<"quorum">>, + with_amqp10_session( + Config, + fun (Sess) -> + amqp10_declare_queue(Sess, Src, #{<<"x-queue-type">> => {utf8, Type}}), + amqp10_declare_queue(Sess, Dest, #{<<"x-queue-type">> => {utf8, Type}, + <<"x-overflow">> => {utf8, <<"reject-publish">>}, + <<"x-max-length">> => {ulong, 5} + }), + amqp10_publish(Sess, Src, <<"hello">>, 100), + ExtraArgs = [{<<"ack-mode">>, AckMode}, + {<<"src-delete-after">>, 50}], + ShovelArgs = ?config(shovel_args, Config) ++ ExtraArgs, + set_param_nowait(Config, ?PARAM, ShovelArgs), + await_autodelete(Config, ?PARAM), + eventually( + ?_assert( + list_queue_messages(Config, Dest) >= 5), + 1000, 45), + ExpDest = list_queue_messages(Config, Dest), + amqp10_expect_count(Sess, Src, ExpSrcFun(ExpDest)), + amqp10_expect_count(Sess, Dest, ExpDest) + end). + +>>>>>>> 059813a83 (Shovel: more common testcases) %%---------------------------------------------------------------------------- maybe_skip_local_protocol(Config) -> [Node] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -250,3 +400,12 @@ maybe_skip_local_protocol(Config) -> _ -> {skip, "This group requires rabbitmq_4.0.0 feature flag"} end. + +list_queue_messages(Config, QName) -> + List = rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, + ["list_queues", "name", "messages", "--no-table-headers"]), + [[_, Messages]] = lists:filter(fun([Q, _]) -> + Q == QName + end, List), + binary_to_integer(Messages).