Skip to content

Commit c2569d2

Browse files
ikavgomichaelklishin
authored andcommitted
RMQ-1263: Shovels forward counter - fix dialyzer
(cherry picked from commit af22cf427a7054d93b3dd64fda01a86649fdd7c5)
1 parent d4c1121 commit c2569d2

File tree

4 files changed

+16
-12
lines changed

4 files changed

+16
-12
lines changed

deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
6363
undefined ->
6464
{error, rabbit_data_coercion:to_binary(ErrMsg)};
6565
Match ->
66-
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match,
66+
{{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
6767
{_, HostingNode} = lists:keyfind(node, 1, Opts),
6868
case rabbit_misc:rpc_call(
6969
HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@
8383
-callback forward(Tag :: tag(), Props :: #{atom() => any()},
8484
Payload :: binary(), state()) ->
8585
state() | {stop, any()}.
86-
-callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore.
86+
-callback status(state()) -> rabbit_shovel_status:shovel_status().
8787

8888
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
8989
source_config() | dest_config().
@@ -155,12 +155,14 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) ->
155155
nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
156156
Mod:nack(Tag, Multi, State).
157157

158+
-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}.
158159
status(#{dest := #{module := Mod}} = State) ->
159160
{Mod:status(State), metrics(State)}.
160161

161162
incr_forwarded(State = #{dest := Dest}) ->
162163
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
163164

165+
-spec metrics(state()) -> rabbit_shovel_status:metrics().
164166
metrics(_State = #{source := Source,
165167
dest := Dest}) ->
166168
#{remaining => maps:get(remaining, Source, unlimited),

deps/rabbitmq_shovel/src/rabbit_shovel_status.erl

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,26 @@
3636
| {running, proplists:proplist()}
3737
| {terminated, term()}.
3838
-type blocked_status() :: running | flow | blocked.
39+
-type shovel_status() :: blocked_status() | ignore.
3940

4041
-type name() :: binary() | {rabbit_types:vhost(), binary()}.
4142
-type type() :: static | dynamic.
42-
-type status_tuple() :: {name(), type(), info(), calendar:datetime()}.
43+
-type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
44+
remaining_unacked := rabbit_types:option(non_neg_integer()),
45+
pending := rabbit_types:option(non_neg_integer()),
46+
forwarded := rabbit_types:option(non_neg_integer())
47+
} | #{}.
48+
-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}.
4349

44-
-export_type([info/0, blocked_status/0]).
50+
-export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]).
4551

4652
-record(state, {timer}).
4753
-record(entry, {name :: name(),
4854
type :: type(),
4955
info :: info(),
5056
blocked_status = running :: blocked_status(),
5157
blocked_at :: integer() | undefined,
52-
metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
53-
ramaining_unacked := rabbit_types:option(non_neg_integer()),
54-
pending := rabbit_types:option(non_neg_integer()),
55-
forwarded := rabbit_types:option(non_neg_integer())
56-
},
58+
metrics = #{} :: metrics(),
5759

5860
timestamp :: calendar:datetime()}).
5961

@@ -64,7 +66,7 @@ start_link() ->
6466
report(Name, Type, Info) ->
6567
gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).
6668

67-
-spec report_blocked_status(name(), blocked_status()) -> ok.
69+
-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok.
6870
report_blocked_status(Name, Status) ->
6971
gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}).
7072

deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
2222
type :: static | dynamic,
2323
config :: rabbit_shovel_behaviour:state(),
24-
last_reported_status = running :: rabbit_shovel_status:blocked_status()}).
24+
last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}).
2525

2626
start_link(Type, Name, Config) ->
2727
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
@@ -224,7 +224,7 @@ human_readable_name(Name) ->
224224
maybe_report_blocked_status(#state{config = Config,
225225
last_reported_status = LastStatus} = State) ->
226226
case rabbit_shovel_behaviour:status(Config) of
227-
ignore ->
227+
{ignore, _} ->
228228
State;
229229
LastStatus ->
230230
State;

0 commit comments

Comments
 (0)