Skip to content

Commit a067423

Browse files
Merge pull request #14592 from rabbitmq/mergify/bp/v4.2.x/pr-14561
Local shovels: Add global counters (backport #14561)
2 parents fd5a120 + 11ef8f9 commit a067423

File tree

2 files changed

+101
-4
lines changed

2 files changed

+101
-4
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,15 @@
1515
-include_lib("rabbit/include/mc.hrl").
1616
-include("rabbit_shovel.hrl").
1717

18+
-rabbit_boot_step({rabbit_global_local_shovel_counters,
19+
[{description, "global local shovel counters"},
20+
{mfa, {?MODULE, boot_step,
21+
[]}},
22+
{requires, rabbit_global_counters},
23+
{enables, external_infrastructure}]}).
24+
1825
-export([
26+
boot_step/0,
1927
parse/2,
2028
connect_source/1,
2129
connect_dest/1,
@@ -53,12 +61,20 @@
5361
%% See rabbit_amqp_session.erl
5462
-define(INITIAL_DELIVERY_COUNT, 16#ff_ff_ff_ff - 4).
5563
-define(DEFAULT_MAX_LINK_CREDIT, 1000).
64+
-define(PROTOCOL, 'local-shovel').
5665

5766
-record(pending_ack, {
5867
delivery_tag,
5968
msg_id
6069
}).
6170

71+
boot_step() ->
72+
Labels = #{protocol => ?PROTOCOL},
73+
rabbit_global_counters:init(Labels),
74+
rabbit_global_counters:init(Labels#{queue_type => rabbit_classic_queue}),
75+
rabbit_global_counters:init(Labels#{queue_type => rabbit_quorum_queue}),
76+
rabbit_global_counters:init(Labels#{queue_type => rabbit_stream_queue}).
77+
6278
parse(_Name, {source, Source}) ->
6379
Queue = parse_parameter(queue, fun parse_binary/1,
6480
proplists:get_value(queue, Source)),
@@ -185,6 +201,7 @@ init_source(State = #{source := #{queue_r := QName,
185201
end
186202
end) of
187203
{Remaining, {ok, QState1}} ->
204+
rabbit_global_counters:consumer_created(?PROTOCOL),
188205
{ok, QState, Actions} = rabbit_queue_type:credit(QName, CTag, ?INITIAL_DELIVERY_COUNT, MaxLinkCredit, false, QState1),
189206
State2 = State#{source => Src#{current => Current#{queue_states => QState,
190207
consumer_tag => CTag},
@@ -214,6 +231,7 @@ init_source(State = #{source := #{queue_r := QName,
214231
init_dest(#{name := Name,
215232
shovel_type := Type,
216233
dest := #{add_forward_headers := AFH} = Dst} = State) ->
234+
rabbit_global_counters:publisher_created(?PROTOCOL),
217235
_TRef = erlang:send_after(1000, self(), send_confirms_and_nacks),
218236
case AFH of
219237
true ->
@@ -258,12 +276,14 @@ dest_endpoint(_Config) ->
258276
[].
259277

260278
close_dest(_State) ->
279+
rabbit_global_counters:publisher_deleted(?PROTOCOL),
261280
ok.
262281

263282
close_source(#{source := #{current := #{queue_states := QStates0,
264283
consumer_tag := CTag,
265284
user := User},
266285
queue_r := QName}}) ->
286+
rabbit_global_counters:consumer_deleted(?PROTOCOL),
267287
case rabbit_amqqueue:with(
268288
QName,
269289
fun(Q) ->
@@ -363,6 +383,7 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current} =
363383
Msg = set_annotations(Msg0, Dest),
364384
RoutedQNames = route(Msg, Dest),
365385
Queues = rabbit_amqqueue:lookup_many(RoutedQNames),
386+
messages_received(AckMode),
366387
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
367388
{ok, QState1, Actions} ->
368389
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
@@ -451,13 +472,15 @@ handle_queue_actions(Actions, State) ->
451472
end, State, Actions).
452473

453474
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
475+
NumMsgs = length(Msgs),
454476
maybe_grant_credit(
455477
lists:foldl(
456-
fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
478+
fun({QName, _QPid, MsgId, _Redelivered, Mc}, #{source := #{current := #{queue_states := QStates }}} = S0) ->
479+
messages_delivered(QName, QStates),
457480
DeliveryTag = next_tag(S0),
458481
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
459482
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
460-
end, sent_delivery(State, length(Msgs)), Msgs)).
483+
end, sent_delivery(State, NumMsgs), Msgs)).
461484

462485
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
463486
DeliveryTag.
@@ -616,6 +639,7 @@ settle(Op, DeliveryTag, Multiple,
616639
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
617640
case rabbit_queue_type:settle(QRef, Op, CTag, lists:reverse(MsgIds), QState0) of
618641
{ok, QState1, Actions} ->
642+
messages_acknowledged(Op, QRef, QState1, MsgIds),
619643
State = State0#{source => Src#{current => Current#{queue_states => QState1,
620644
unacked_message_q => UAMQ}}},
621645
handle_queue_actions(Actions, State);
@@ -739,12 +763,18 @@ handle_credit_reply({credit_reply, CTag, DeliveryCount, Credit, _Available, _Dra
739763
at_least_one_credit_req_in_flight => false}}
740764
end.
741765

742-
process_routing_confirm(undefined, _, _, State) ->
766+
process_routing_confirm(undefined, _, [], State) ->
767+
rabbit_global_counters:messages_unroutable_returned(?PROTOCOL, 1),
768+
State;
769+
process_routing_confirm(undefined, _, QRefs, State) ->
770+
rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)),
743771
State;
744772
process_routing_confirm(MsgSeqNo, Tag, [], State)
745773
when is_integer(MsgSeqNo) ->
774+
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
746775
record_confirms([{MsgSeqNo, Tag}], State);
747776
process_routing_confirm(MsgSeqNo, Tag, QRefs, #{dest := Dst = #{unconfirmed := Unconfirmed}} = State) when is_integer(MsgSeqNo) ->
777+
rabbit_global_counters:messages_routed(?PROTOCOL, length(QRefs)),
748778
State#{dest => Dst#{unconfirmed =>
749779
rabbit_shovel_confirms:insert(MsgSeqNo, QRefs, Tag, Unconfirmed)}}.
750780

@@ -781,8 +811,10 @@ send_nacks(Rs, Cs, State) ->
781811
send_confirms([], _, State) ->
782812
State;
783813
send_confirms([MsgSeqNo], _, State) ->
814+
rabbit_global_counters:messages_confirmed(?PROTOCOL, 1),
784815
rabbit_shovel_behaviour:ack(MsgSeqNo, false, State);
785816
send_confirms(Cs, Rs, State) ->
817+
rabbit_global_counters:messages_confirmed(?PROTOCOL, length(Cs)),
786818
coalesce_and_send(Cs, Rs,
787819
fun(MsgSeqNo, Multiple, StateX) ->
788820
rabbit_shovel_behaviour:ack(MsgSeqNo, Multiple, StateX)
@@ -833,3 +865,30 @@ decr_remaining(Num, State) ->
833865
_ = send_confirms_and_nacks(State),
834866
exit(R)
835867
end.
868+
869+
messages_acknowledged(complete, QName, QS, MsgIds) ->
870+
case rabbit_queue_type:module(QName, QS) of
871+
{ok, QType} ->
872+
rabbit_global_counters:messages_acknowledged(?PROTOCOL, QType, length(MsgIds));
873+
_ ->
874+
ok
875+
end;
876+
messages_acknowledged(_, _, _, _) ->
877+
ok.
878+
879+
messages_received(AckMode) ->
880+
rabbit_global_counters:messages_received(?PROTOCOL, 1),
881+
case AckMode of
882+
on_confirm ->
883+
rabbit_global_counters:messages_received_confirm(?PROTOCOL, 1);
884+
_ ->
885+
ok
886+
end.
887+
888+
messages_delivered(QName, S0) ->
889+
case rabbit_queue_type:module(QName, S0) of
890+
{ok, QType} ->
891+
rabbit_global_counters:messages_delivered(?PROTOCOL, QType, 1);
892+
_ ->
893+
ok
894+
end.

deps/rabbitmq_shovel/test/local_dynamic_SUITE.erl

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ groups() ->
7777
local_to_local_stream_credit_flow_on_confirm,
7878
local_to_local_stream_credit_flow_on_publish,
7979
local_to_local_stream_credit_flow_no_ack,
80-
local_to_local_simple_uri
80+
local_to_local_simple_uri,
81+
local_to_local_counters
8182
]}
8283
].
8384

@@ -1050,6 +1051,36 @@ local_to_local_simple_uri(Config) ->
10501051
none]),
10511052
shovel_test_utils:await_shovel(Config, ?PARAM).
10521053

1054+
local_to_local_counters(Config) ->
1055+
Src = ?config(srcq, Config),
1056+
Dest = ?config(destq, Config),
1057+
%% Let's restart the node so the counters are reset
1058+
ok = rabbit_ct_broker_helpers:stop_node(Config, 0),
1059+
ok = rabbit_ct_broker_helpers:start_node(Config, 0),
1060+
with_session(
1061+
Config,
1062+
fun (Sess) ->
1063+
?awaitMatch(#{publishers := 0, consumers := 0},
1064+
get_global_counters(Config), 30_000),
1065+
shovel_test_utils:set_param(Config, ?PARAM,
1066+
[{<<"src-protocol">>, <<"local">>},
1067+
{<<"src-queue">>, Src},
1068+
{<<"dest-protocol">>, <<"local">>},
1069+
{<<"dest-queue">>, Dest}
1070+
]),
1071+
?awaitMatch(#{publishers := 1, consumers := 1},
1072+
get_global_counters(Config), 30_000),
1073+
_ = publish_many(Sess, Src, Dest, <<"tag1">>, 150),
1074+
?awaitMatch(#{consumers := 1, publishers := 1,
1075+
messages_received_total := 150,
1076+
messages_received_confirm_total := 150,
1077+
messages_routed_total := 150,
1078+
messages_unroutable_dropped_total := 0,
1079+
messages_unroutable_returned_total := 0,
1080+
messages_confirmed_total := 150},
1081+
get_global_counters(Config), 30_000)
1082+
end).
1083+
10531084
%%----------------------------------------------------------------------------
10541085
with_session(Config, Fun) ->
10551086
with_session(Config, <<"/">>, Fun).
@@ -1217,3 +1248,10 @@ delete_queue(Name, VHost) ->
12171248
_ ->
12181249
ok
12191250
end.
1251+
1252+
get_global_counters(Config) ->
1253+
get_global_counters0(Config, #{protocol => 'local-shovel'}).
1254+
1255+
get_global_counters0(Config, Key) ->
1256+
Overview = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_global_counters, overview, []),
1257+
maps:get(Key, Overview).

0 commit comments

Comments
 (0)