Skip to content

Commit 056efd2

Browse files
Merge pull request #14708 from rabbitmq/mergify/bp/v4.2.x/pr-14704
Shovels: increase forwarded counter for AMQP1.0 (backport #14704)
2 parents 0012652 + 5d97501 commit 056efd2

File tree

8 files changed

+516
-464
lines changed

8 files changed

+516
-464
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,23 +333,24 @@ forward(Tag, Mc,
333333
State#{dest => Dst#{pending => {Pend}}};
334334
forward(Tag, Msg0,
335335
#{dest := #{current := #{link := Link},
336-
unacked := Unacked} = Dst,
336+
unacked := Unacked},
337337
ack_mode := AckMode} = State) ->
338338
OutTag = rabbit_data_coercion:to_binary(Tag),
339339
Msg1 = add_timestamp_header(State, add_forward_headers(State, Msg0)),
340340
Msg2 = mc:protocol_state(mc:convert(mc_amqp, Msg1)),
341341
Msg3 = amqp10_raw_msg:new(AckMode =/= on_confirm, Tag, iolist_to_binary(Msg2)),
342342
case send_msg(Link, Msg3) of
343343
ok ->
344+
#{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State),
344345
rabbit_shovel_behaviour:decr_remaining_unacked(
345346
case AckMode of
346347
no_ack ->
347-
rabbit_shovel_behaviour:decr_remaining(1, State);
348+
rabbit_shovel_behaviour:decr_remaining(1, State1);
348349
on_confirm ->
349-
State#{dest => Dst#{unacked => Unacked#{OutTag => Tag}}};
350+
State1#{dest => Dst1#{unacked => Unacked#{OutTag => Tag}}};
350351
on_publish ->
351-
State1 = rabbit_shovel_behaviour:ack(Tag, false, State),
352-
rabbit_shovel_behaviour:decr_remaining(1, State1)
352+
State2 = rabbit_shovel_behaviour:ack(Tag, false, State1),
353+
rabbit_shovel_behaviour:decr_remaining(1, State2)
353354
end);
354355
Stop ->
355356
Stop

deps/rabbitmq_shovel/test/amqp091_dynamic_SUITE.erl

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
-include_lib("amqp_client/include/amqp_client.hrl").
1212

1313
-import(rabbit_ct_helpers, [eventually/1]).
14+
-import(shovel_test_utils, [await_autodelete/2,
15+
invalid_param/2, invalid_param/3,
16+
valid_param/2, valid_param/3]).
1417

1518
-compile(export_all).
1619

@@ -27,7 +30,6 @@ all() ->
2730
groups() ->
2831
[
2932
{core_tests, [], [
30-
simple,
3133
set_properties_using_proplist,
3234
set_properties_using_map,
3335
set_empty_properties_using_proplist,
@@ -115,21 +117,6 @@ end_per_testcase(Testcase, Config) ->
115117
%% -------------------------------------------------------------------
116118
%% Testcases.
117119
%% -------------------------------------------------------------------
118-
119-
simple(Config) ->
120-
Name = <<"test">>,
121-
with_ch(Config,
122-
fun (Ch) ->
123-
shovel_test_utils:set_param(
124-
Config,
125-
Name, [{<<"src-queue">>, <<"src">>},
126-
{<<"dest-queue">>, <<"dest">>}]),
127-
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>),
128-
Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
129-
?assertMatch([_|_], Status),
130-
?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status))
131-
end).
132-
133120
quorum_queues(Config) ->
134121
with_ch(Config,
135122
fun (Ch) ->
@@ -814,23 +801,6 @@ expect_count(Ch, Q, M, Count) ->
814801
end || _ <- lists:seq(1, Count)],
815802
expect_empty(Ch, Q).
816803

817-
invalid_param(Config, Value, User) ->
818-
{error_string, _} = rabbit_ct_broker_helpers:rpc(Config, 0,
819-
rabbit_runtime_parameters, set,
820-
[<<"/">>, <<"shovel">>, <<"invalid">>, Value, User]).
821-
822-
valid_param(Config, Value, User) ->
823-
rabbit_ct_broker_helpers:rpc(Config, 0,
824-
?MODULE, valid_param1, [Config, Value, User]).
825-
826-
valid_param1(_Config, Value, User) ->
827-
ok = rabbit_runtime_parameters:set(
828-
<<"/">>, <<"shovel">>, <<"name">>, Value, User),
829-
ok = rabbit_runtime_parameters:clear(<<"/">>, <<"shovel">>, <<"name">>, <<"acting-user">>).
830-
831-
invalid_param(Config, Value) -> invalid_param(Config, Value, none).
832-
valid_param(Config, Value) -> valid_param(Config, Value, none).
833-
834804
lookup_user(Config, Name) ->
835805
{ok, User} = rabbit_ct_broker_helpers:rpc(Config, 0,
836806
rabbit_access_control, check_user_login, [Name, []]),
@@ -849,23 +819,6 @@ cleanup1(_Config) ->
849819
[rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>)
850820
|| Q <- rabbit_amqqueue:list()].
851821

852-
await_autodelete(Config, Name) ->
853-
rabbit_ct_broker_helpers:rpc(Config, 0,
854-
?MODULE, await_autodelete1, [Config, Name]).
855-
856-
await_autodelete1(_Config, Name) ->
857-
shovel_test_utils:await(
858-
fun () -> not lists:member(Name, shovels_from_parameters()) end),
859-
shovel_test_utils:await(
860-
fun () ->
861-
not lists:member(Name,
862-
shovel_test_utils:shovels_from_status())
863-
end).
864-
865-
shovels_from_parameters() ->
866-
L = rabbit_runtime_parameters:list(<<"/">>, <<"shovel">>),
867-
[rabbit_misc:pget(name, Shovel) || Shovel <- L].
868-
869822
set_default_credit(Config, Value) ->
870823
Key = credit_flow_default_credit,
871824
OrigValue = rabbit_ct_broker_helpers:rpc(Config, persistent_term, get, [Key]),

0 commit comments

Comments
 (0)