Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
ack/3,
nack/3,
status/1,
forward/3
forward/3,
pending_count/1
]).

%% Function references should not be stored on the metadata store.
Expand Down Expand Up @@ -360,6 +361,10 @@ status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
status(_) ->
running.

pending_count(#{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
queue:len(Pending).

add_pending(Elem, State = #{dest := Dest}) ->
Pending = maps:get(pending, Dest, queue:new()),
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.
Expand Down
7 changes: 6 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
ack/3,
nack/3,
status/1,
forward/3
forward/3,
pending_count/1
]).

-import(rabbit_misc, [pget/2, pget/3]).
Expand Down Expand Up @@ -317,6 +318,10 @@ status(_) ->
%% Destination not yet connected
ignore.

pending_count(#{dest := Dest}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending messages in AMQP0.9.1 are counted in the destination side, thus in AMQP1.0 shovels they should be dest -> unacked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that in case of AMQP 1.0 the dest -> unacked field is only used in on-confirm ack-mode. But there can be pending messages in case of other ack-modes too when there is no more link credit to send the message to the dest. In this case the whole message is buffered in the shovel process. My understanding is that the pending metric only counts the number of buffered ie unsent messages. Unacked additionally counts messages that were sent but not yet acked by the dest.

There is also the metric remaining_unacked, but I'm not sure it is counted correctly in case of on-confirm. It is decremented when the message is sent (the same way as for other ack-modes) and not when the message is accepted/rejected by the dest.

Pending = maps:get(pending, Dest, []),
length(Pending).

-spec forward(Tag :: tag(), Mc :: mc:state(), state()) ->
state() | {stop, any()}.
forward(_Tag, _Mc,
Expand Down
8 changes: 7 additions & 1 deletion deps/rabbitmq_shovel/src/rabbit_local_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
ack/3,
nack/3,
forward/3,
status/1
status/1,
pending_count/1
]).

-export([
Expand Down Expand Up @@ -437,6 +438,11 @@ add_routing(Msg0, Dest) ->
status(_) ->
running.

pending_count(#{source := #{current := #{unacked_message_q := UAMQ}}}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending messages in AMQP0.9.1 are counted in the destination side, thus in local shovels they should be dest -> unconfirmed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I see it correctly there is no flow control between a local shovel and a queue so there can be no pending aka buffered messages for local shovels (according to my above definition of pending).

dest -> unconfirmed is only non-zero in case of on-confirm ack-mode. source -> unacked_message_q can be non-empty for on-confirm and on-publish.

?QUEUE:len(UAMQ);
pending_count(_State) ->
0.

%% Internal

parse_parameter(_, _, none) ->
Expand Down
11 changes: 6 additions & 5 deletions deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
-callback forward(Tag :: tag(), Msg :: mc:state(), state()) ->
state() | {stop, any()}.
-callback status(state()) -> rabbit_shovel_status:shovel_status().
-callback pending_count(state()) -> non_neg_integer().

-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
source_config() | dest_config().
Expand Down Expand Up @@ -164,12 +165,12 @@ incr_forwarded(State = #{dest := Dest}) ->
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.

-spec metrics(state()) -> rabbit_shovel_status:metrics().
metrics(_State = #{source := Source,
dest := Dest}) ->
metrics(#{source := Source,
dest := #{module := Mod}} = State) ->
#{remaining => maps:get(remaining, Source, unlimited),
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => maps:get(pending, Dest, 0),
forwarded => maps:get(forwarded, Dest, 0)}.
remaining_unacked => maps:get(remaining_unacked, Source, 0),
pending => Mod:pending_count(State),
forwarded => maps:get(forwarded, maps:get(dest, State), 0)}.


%% Common functions
Expand Down
175 changes: 175 additions & 0 deletions deps/rabbitmq_shovel/test/pending_count_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(pending_count_SUITE).

-compile(export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("../include/rabbit_shovel.hrl").

%%%===================================================================
%%% Common Test callbacks
%%%===================================================================

all() ->
[
{group, pending_count_tests}
].

groups() ->
[
{pending_count_tests, [], [
amqp091_pending_count_empty_queue,
amqp091_pending_count_with_messages,
amqp091_pending_count_after_drain,
amqp10_pending_count_empty_list,
amqp10_pending_count_with_messages,
amqp10_pending_count_after_clear,
local_pending_count_empty_queue,
local_pending_count_with_messages,
local_pending_count_after_settle,
behaviour_metrics_includes_pending,
behaviour_pending_count_delegation
]}
].

init_per_suite(Config) ->
Config.

end_per_suite(_Config) ->
ok.

init_per_group(_Group, Config) ->
Config.

end_per_group(_Group, _Config) ->
ok.

init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(_TestCase, _Config) ->
meck:unload(),
ok.

%%%===================================================================
%%% Test cases
%%%===================================================================

%% Test AMQP 0.9.1 pending_count functionality
amqp091_pending_count_empty_queue(_Config) ->
%% Test that pending_count returns 0 when no messages are pending
State = #{dest => #{}},
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).

amqp091_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count when messages are pending
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
State = #{dest => #{pending => PendingQueue}},
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).

amqp091_pending_count_after_drain(_Config) ->
%% Test that pending_count returns 0 after messages are drained
EmptyQueue = queue:new(),
State = #{dest => #{pending => EmptyQueue}},
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).

%% Test AMQP 1.0 pending_count functionality
amqp10_pending_count_empty_list(_Config) ->
%% Test that pending_count returns 0 when no messages are pending
State = #{dest => #{}},
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).

amqp10_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count when messages are pending
PendingList = [{1, msg1}, {2, msg2}],
State = #{dest => #{pending => PendingList}},
?assertEqual(2, rabbit_amqp10_shovel:pending_count(State)).

amqp10_pending_count_after_clear(_Config) ->
%% Test that pending_count returns 0 after pending list is cleared
State = #{dest => #{pending => []}},
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).

%% Test Local shovel pending_count functionality
local_pending_count_empty_queue(_Config) ->
%% Test that pending_count returns 0 when unacked message queue is empty
EmptyQueue = lqueue:new(),
State = #{source => #{current => #{unacked_message_q => EmptyQueue}}},
?assertEqual(0, rabbit_local_shovel:pending_count(State)).

local_pending_count_with_messages(_Config) ->
%% Test that pending_count returns correct count from unacked message queue
UnackedQueue = lqueue:from_list([msg1, msg2, msg3, msg4]),
State = #{source => #{current => #{unacked_message_q => UnackedQueue}}},
?assertEqual(4, rabbit_local_shovel:pending_count(State)).

local_pending_count_after_settle(_Config) ->
%% Test that pending_count returns 0 when state doesn't contain unacked queue
State = #{source => #{current => #{}}},
?assertEqual(0, rabbit_local_shovel:pending_count(State)).

%% Test behaviour module integration
behaviour_metrics_includes_pending(_Config) ->
%% Mock the destination module's pending_count and status functions
meck:new(rabbit_amqp091_shovel, [passthrough]),
meck:expect(rabbit_amqp091_shovel, pending_count, fun(_) -> 5 end),
meck:expect(rabbit_amqp091_shovel, status, fun(_) -> running end),

State = #{source => #{remaining => 10, remaining_unacked => 3},
dest => #{module => rabbit_amqp091_shovel, forwarded => 7}},

{_Status, Metrics} = rabbit_shovel_behaviour:status(State),

?assertMatch(#{remaining := 10,
remaining_unacked := 3,
pending := 5,
forwarded := 7}, Metrics),

?assert(meck:validate(rabbit_amqp091_shovel)).

behaviour_pending_count_delegation(_Config) ->
%% Test that the behaviour module correctly delegates to the specific implementation
meck:new(rabbit_amqp10_shovel, [passthrough]),
meck:expect(rabbit_amqp10_shovel, pending_count, fun(_State) -> 3 end),
meck:expect(rabbit_amqp10_shovel, status, fun(_State) -> running end),

State = #{dest => #{module => rabbit_amqp10_shovel}},

%% This would be called indirectly through status/1
{_Status, Metrics} = rabbit_shovel_behaviour:status(#{source => #{},
dest => maps:get(dest, State)}),

?assertEqual(3, maps:get(pending, Metrics)),
?assert(meck:validate(rabbit_amqp10_shovel)).

%%%===================================================================
%%% Integration tests for pending_count behavior in different scenarios
%%%===================================================================

%% Additional test cases to verify pending_count behavior in realistic scenarios
%% These could be added if we want to test the actual message flow scenarios

pending_count_during_flow_control(_Config) ->
%% Test case outline: Verify pending_count increases when flow control blocks forwarding
%% and decreases when flow control is lifted
%% This would require more complex setup with actual message handling
ok.

pending_count_with_multiple_ack_modes(_Config) ->
%% Test case outline: Verify pending_count behaves correctly across different ack modes
%% (no_ack, on_publish, on_confirm)
ok.

pending_count_edge_cases(_Config) ->
%% Test case outline: Test edge cases like:
%% - Missing dest/source maps
%% - Malformed pending data structures
%% - Very large pending counts
ok.