Skip to content

Commit c9fbd13

Browse files
committed
Shovel: add forwarded counter
1 parent 8e67098 commit c9fbd13

File tree

3 files changed

+32
-8
lines changed

3 files changed

+32
-8
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,17 @@ publish(IncomingTag, Method, Msg,
365365
ok = amqp_channel:call(OutboundChan, Method, Msg)
366366
end,
367367

368+
#{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State),
369+
368370
rabbit_shovel_behaviour:decr_remaining_unacked(
369371
case AckMode of
370372
no_ack ->
371-
rabbit_shovel_behaviour:decr_remaining(1, State);
373+
rabbit_shovel_behaviour:decr_remaining(1, State1);
372374
on_confirm ->
373-
State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}};
375+
State1#{dest => Dst1#{unacked => Unacked#{Seq => IncomingTag}}};
374376
on_publish ->
375-
State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State),
376-
rabbit_shovel_behaviour:decr_remaining(1, State1)
377+
State2 = rabbit_shovel_behaviour:ack(IncomingTag, false, State1),
378+
rabbit_shovel_behaviour:decr_remaining(1, State2)
377379
end).
378380

379381
control_throttle(State) ->

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
status/1,
3131
% common functions
3232
decr_remaining_unacked/1,
33-
decr_remaining/2
33+
decr_remaining/2,
34+
incr_forwarded/1
3435
]).
3536

3637
-type tag() :: non_neg_integer().
@@ -155,7 +156,18 @@ nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
155156
Mod:nack(Tag, Multi, State).
156157

157158
status(#{dest := #{module := Mod}} = State) ->
158-
Mod:status(State).
159+
{Mod:status(State), metrics(State)}.
160+
161+
incr_forwarded(State = #{dest := Dest}) ->
162+
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
163+
164+
metrics(_State = #{source := Source,
165+
dest := Dest}) ->
166+
#{remaining => maps:get(remaining, Source, unlimited),
167+
remaining_unacked => maps:get(remaining_unacked, Source, 0),
168+
pending => maps:get(pending, Dest, 0),
169+
forwarded => maps:get(forwarded, Dest, 0)}.
170+
159171

160172
%% Common functions
161173

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@
4949
info :: info(),
5050
blocked_status = running :: blocked_status(),
5151
blocked_at :: integer() | undefined,
52+
metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
53+
ramaining_unacked := rabbit_types:option(non_neg_integer()),
54+
pending := rabbit_types:option(non_neg_integer()),
55+
forwarded := rabbit_types:option(non_neg_integer())
56+
},
57+
5258
timestamp :: calendar:datetime()}).
5359

5460
start_link() ->
@@ -112,6 +118,7 @@ handle_call(status, _From, State) ->
112118
{reply, [{Entry#entry.name,
113119
Entry#entry.type,
114120
blocked_status_to_info(Entry),
121+
Entry#entry.metrics,
115122
Entry#entry.timestamp}
116123
|| Entry <- Entries], State};
117124

@@ -120,6 +127,7 @@ handle_call({lookup, Name}, _From, State) ->
120127
[Entry] -> [{name, Name},
121128
{type, Entry#entry.type},
122129
{info, blocked_status_to_info(Entry)},
130+
{metrics, Entry#entry.metrics},
123131
{timestamp, Entry#entry.timestamp}];
124132
[] -> not_found
125133
end,
@@ -141,13 +149,15 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) ->
141149
split_name(Name) ++ split_status(Info)),
142150
{noreply, State};
143151

144-
handle_cast({report_blocked_status, Name, Status, Timestamp}, State) ->
152+
handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) ->
145153
case Status of
146154
flow ->
147155
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow},
156+
{#entry.metrics, Metrics},
148157
{#entry.blocked_at, Timestamp}]);
149158
_ ->
150-
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status}])
159+
true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status},
160+
{#entry.metrics, Metrics}])
151161
end,
152162
{noreply, State};
153163

0 commit comments

Comments
 (0)