Skip to content

Commit 8ab204f

Browse files
Merge pull request #14710 from rabbitmq/mergify/bp/v4.2.x/pr-14614
Ensures pending counter in rabbit_shovel_status is always an integer (backport #14614)
2 parents 0c8337c + 0238be5 commit 8ab204f

File tree

5 files changed

+167
-8
lines changed

5 files changed

+167
-8
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
ack/3,
3434
nack/3,
3535
status/1,
36-
forward/3
36+
forward/3,
37+
pending_count/1
3738
]).
3839

3940
%% Function references should not be stored on the metadata store.
@@ -360,6 +361,10 @@ status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
360361
status(_) ->
361362
running.
362363

364+
pending_count(#{dest := Dest}) ->
365+
Pending = maps:get(pending, Dest, queue:new()),
366+
queue:len(Pending).
367+
363368
add_pending(Elem, State = #{dest := Dest}) ->
364369
Pending = maps:get(pending, Dest, queue:new()),
365370
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
ack/3,
3333
nack/3,
3434
status/1,
35-
forward/3
35+
forward/3,
36+
pending_count/1
3637
]).
3738

3839
-import(rabbit_misc, [pget/2, pget/3]).
@@ -317,6 +318,10 @@ status(_) ->
317318
%% Destination not yet connected
318319
ignore.
319320

321+
pending_count(#{dest := Dest}) ->
322+
Pending = maps:get(pending, Dest, []),
323+
length(Pending).
324+
320325
-spec forward(Tag :: tag(), Mc :: mc:state(), state()) ->
321326
state() | {stop, any()}.
322327
forward(_Tag, _Mc,

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
ack/3,
4343
nack/3,
4444
forward/3,
45-
status/1
45+
status/1,
46+
pending_count/1
4647
]).
4748

4849
-export([
@@ -443,6 +444,9 @@ add_routing(Msg0, Dest) ->
443444
status(_) ->
444445
running.
445446

447+
pending_count(_State) ->
448+
0.
449+
446450
%% Internal
447451

448452
parse_parameter(_, _, none) ->

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
-callback forward(Tag :: tag(), Msg :: mc:state(), state()) ->
8686
state() | {stop, any()}.
8787
-callback status(state()) -> rabbit_shovel_status:shovel_status().
88+
-callback pending_count(state()) -> non_neg_integer().
8889

8990
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
9091
source_config() | dest_config().
@@ -164,12 +165,12 @@ incr_forwarded(State = #{dest := Dest}) ->
164165
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
165166

166167
-spec metrics(state()) -> rabbit_shovel_status:metrics().
167-
metrics(_State = #{source := Source,
168-
dest := Dest}) ->
168+
metrics(#{source := Source,
169+
dest := #{module := Mod}} = State) ->
169170
#{remaining => maps:get(remaining, Source, unlimited),
170-
remaining_unacked => maps:get(remaining_unacked, Source, 0),
171-
pending => maps:get(pending, Dest, 0),
172-
forwarded => maps:get(forwarded, Dest, 0)}.
171+
remaining_unacked => maps:get(remaining_unacked, Source, 0),
172+
pending => Mod:pending_count(State),
173+
forwarded => maps:get(forwarded, maps:get(dest, State), 0)}.
173174

174175

175176
%% Common functions
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(pending_count_SUITE).
9+
10+
-compile(export_all).
11+
12+
-include_lib("eunit/include/eunit.hrl").
13+
-include_lib("rabbit/include/mc.hrl").
14+
-include("../include/rabbit_shovel.hrl").
15+
16+
%%%===================================================================
17+
%%% Common Test callbacks
18+
%%%===================================================================
19+
20+
all() ->
21+
[
22+
{group, pending_count_tests}
23+
].
24+
25+
groups() ->
26+
[
27+
{pending_count_tests, [], [
28+
amqp091_pending_count_empty_queue,
29+
amqp091_pending_count_with_messages,
30+
amqp091_pending_count_after_drain,
31+
amqp10_pending_count_empty_list,
32+
amqp10_pending_count_with_messages,
33+
amqp10_pending_count_after_clear,
34+
local_pending_count_empty_queue,
35+
local_pending_count_after_settle,
36+
behaviour_metrics_includes_pending,
37+
behaviour_pending_count_delegation
38+
]}
39+
].
40+
41+
init_per_suite(Config) ->
42+
Config.
43+
44+
end_per_suite(_Config) ->
45+
ok.
46+
47+
init_per_group(_Group, Config) ->
48+
Config.
49+
50+
end_per_group(_Group, _Config) ->
51+
ok.
52+
53+
init_per_testcase(_TestCase, Config) ->
54+
Config.
55+
56+
end_per_testcase(_TestCase, _Config) ->
57+
meck:unload(),
58+
ok.
59+
60+
%%%===================================================================
61+
%%% Test cases
62+
%%%===================================================================
63+
64+
%% Test AMQP 0.9.1 pending_count functionality
65+
amqp091_pending_count_empty_queue(_Config) ->
66+
%% Test that pending_count returns 0 when no messages are pending
67+
State = #{dest => #{}},
68+
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).
69+
70+
amqp091_pending_count_with_messages(_Config) ->
71+
%% Test that pending_count returns correct count when messages are pending
72+
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
73+
State = #{dest => #{pending => PendingQueue}},
74+
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).
75+
76+
amqp091_pending_count_after_drain(_Config) ->
77+
%% Test that pending_count returns 0 after messages are drained
78+
EmptyQueue = queue:new(),
79+
State = #{dest => #{pending => EmptyQueue}},
80+
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).
81+
82+
%% Test AMQP 1.0 pending_count functionality
83+
amqp10_pending_count_empty_list(_Config) ->
84+
%% Test that pending_count returns 0 when no messages are pending
85+
State = #{dest => #{}},
86+
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).
87+
88+
amqp10_pending_count_with_messages(_Config) ->
89+
%% Test that pending_count returns correct count when messages are pending
90+
PendingList = [{1, msg1}, {2, msg2}],
91+
State = #{dest => #{pending => PendingList}},
92+
?assertEqual(2, rabbit_amqp10_shovel:pending_count(State)).
93+
94+
amqp10_pending_count_after_clear(_Config) ->
95+
%% Test that pending_count returns 0 after pending list is cleared
96+
State = #{dest => #{pending => []}},
97+
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).
98+
99+
%% Test Local shovel pending_count functionality
100+
local_pending_count_empty_queue(_Config) ->
101+
%% Test that pending_count returns 0 when unacked message queue is empty
102+
EmptyQueue = lqueue:new(),
103+
State = #{source => #{current => #{unacked_message_q => EmptyQueue}}},
104+
?assertEqual(0, rabbit_local_shovel:pending_count(State)).
105+
106+
107+
local_pending_count_after_settle(_Config) ->
108+
%% Test that pending_count returns 0 when state doesn't contain unacked queue
109+
State = #{source => #{current => #{}}},
110+
?assertEqual(0, rabbit_local_shovel:pending_count(State)).
111+
112+
%% Test behaviour module integration
113+
behaviour_metrics_includes_pending(_Config) ->
114+
%% Mock the destination module's pending_count and status functions
115+
meck:new(rabbit_amqp091_shovel, [passthrough]),
116+
meck:expect(rabbit_amqp091_shovel, pending_count, fun(_) -> 5 end),
117+
meck:expect(rabbit_amqp091_shovel, status, fun(_) -> running end),
118+
119+
State = #{source => #{remaining => 10, remaining_unacked => 3},
120+
dest => #{module => rabbit_amqp091_shovel, forwarded => 7}},
121+
122+
{_Status, Metrics} = rabbit_shovel_behaviour:status(State),
123+
124+
?assertMatch(#{remaining := 10,
125+
remaining_unacked := 3,
126+
pending := 5,
127+
forwarded := 7}, Metrics),
128+
129+
?assert(meck:validate(rabbit_amqp091_shovel)).
130+
131+
behaviour_pending_count_delegation(_Config) ->
132+
%% Test that the behaviour module correctly delegates to the specific implementation
133+
meck:new(rabbit_amqp10_shovel, [passthrough]),
134+
meck:expect(rabbit_amqp10_shovel, pending_count, fun(_State) -> 3 end),
135+
meck:expect(rabbit_amqp10_shovel, status, fun(_State) -> running end),
136+
137+
State = #{dest => #{module => rabbit_amqp10_shovel}},
138+
139+
%% This would be called indirectly through status/1
140+
{_Status, Metrics} = rabbit_shovel_behaviour:status(#{source => #{},
141+
dest => maps:get(dest, State)}),
142+
143+
?assertEqual(3, maps:get(pending, Metrics)),
144+
?assert(meck:validate(rabbit_amqp10_shovel)).

0 commit comments

Comments
 (0)