Skip to content

Commit 68b98bf

Browse files
committed
Local shovels: ack messages not routed to any queue using exchanges
1 parent 1c72316 commit 68b98bf

File tree

2 files changed

+50
-18
lines changed

2 files changed

+50
-18
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,13 @@ forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
360360
case AckMode of
361361
no_ack ->
362362
rabbit_shovel_behaviour:decr_remaining(1, State2);
363-
on_confirm ->
363+
on_confirm when length(Queues) > 0 ->
364364
Correlation = maps:get(correlation, Options),
365365
State2#{dest => Dst1#{unacked => Unacked#{Correlation => Tag}}};
366+
on_confirm ->
367+
%% Drop the messages as 0.9.1, no destination available
368+
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
369+
rabbit_shovel_behaviour:decr_remaining(1, State3);
366370
on_publish ->
367371
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
368372
rabbit_shovel_behaviour:decr_remaining(1, State3)

deps/rabbitmq_shovel/test/local_dynamic_cluster_SUITE.erl

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ groups() ->
2727
[
2828
{tests, [], [
2929
local_to_local_dest_down,
30-
local_to_local_multiple_dest_down
30+
local_to_local_multiple_dest_down,
31+
local_to_local_no_destination
3132
]}
3233
].
3334

@@ -40,7 +41,7 @@ init_per_suite(Config0) ->
4041
rabbit_ct_helpers:log_environment(),
4142
Config1 = rabbit_ct_helpers:set_config(Config0, [
4243
{rmq_nodename_suffix, ?MODULE},
43-
{rmq_nodes_count, 2},
44+
{rmq_nodes_count, 3},
4445
{rmq_nodes_clustered, true},
4546
{ignored_crashes, [
4647
"server_initiated_close,404",
@@ -105,13 +106,13 @@ local_to_local_dest_down(Config) ->
105106
]),
106107
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
107108
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
108-
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<>>, <<>>, <<>>],
109-
[<<"local_to_local_dest_down_src">>, <<"10">>, _, _]],
109+
?awaitMatch([[<<"local_to_local_dest_down_dest">>, 0, 0, 0],
110+
[<<"local_to_local_dest_down_src">>, 10, _, _]],
110111
list_queue_messages(Config),
111112
30000),
112113
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
113-
?awaitMatch([[<<"local_to_local_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>],
114-
[<<"local_to_local_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]],
114+
?awaitMatch([[<<"local_to_local_dest_down_dest">>, 10, 10, 0],
115+
[<<"local_to_local_dest_down_src">>, 0, 0, 0]],
115116
list_queue_messages(Config),
116117
30000),
117118
expect_many(Sess, Dest, 10)
@@ -132,30 +133,57 @@ local_to_local_multiple_dest_down(Config) ->
132133
{<<"src-queue">>, Src},
133134
{<<"dest-protocol">>, <<"local">>},
134135
{<<"dest-exchange">>, <<"amq.fanout">>},
135-
{<<"dest-exchange-key">>, Dest}
136+
{<<"dest-exchange-key">>, <<"">>}
136137
]),
137138
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
138139
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
139-
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<>>, <<>>, <<>>],
140-
[<<"local_to_local_multiple_dest_down_dest2">>, <<>>, <<>>, <<>>],
141-
[<<"local_to_local_multiple_dest_down_src">>, <<"10">>, _, _]],
140+
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, 0, 0, 0],
141+
[<<"local_to_local_multiple_dest_down_dest2">>, 0, 0, 0],
142+
[<<"local_to_local_multiple_dest_down_src">>, 10, _, _]],
142143
list_queue_messages(Config),
143144
30000),
144145
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
145-
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, <<"10">>, <<"10">>, <<"0">>],
146-
[<<"local_to_local_multiple_dest_down_dest2">>, <<"10">>, <<"10">>, <<"0">>],
147-
[<<"local_to_local_multiple_dest_down_src">>, <<"0">>, <<"0">>, <<"0">>]],
146+
?awaitMatch([[<<"local_to_local_multiple_dest_down_dest">>, N, N, 0],
147+
[<<"local_to_local_multiple_dest_down_dest2">>, M, M, 0],
148+
[<<"local_to_local_multiple_dest_down_src">>, 0, 0, 0]]
149+
when ((N >= 10) and (M >= 10)),
148150
list_queue_messages(Config),
149151
30000),
150152
expect_many(Sess, Dest, 10)
151153
end).
152154

155+
local_to_local_no_destination(Config) ->
156+
Src = ?config(srcq, Config),
157+
Dest = ?config(destq, Config),
158+
declare_queue(Config, 0, <<"/">>, Src),
159+
with_session(
160+
Config,
161+
fun (Sess) ->
162+
shovel_test_utils:set_param(Config, ?PARAM,
163+
[{<<"src-protocol">>, <<"local">>},
164+
{<<"src-queue">>, Src},
165+
{<<"dest-protocol">>, <<"local">>},
166+
{<<"dest-exchange">>, <<"amq.fanout">>},
167+
{<<"dest-exchange-key">>, Dest}
168+
]),
169+
publish_many(Sess, Src, Dest, <<"tag1">>, 10),
170+
?awaitMatch([[<<"local_to_local_no_destination_src">>, 0, 0, 0]],
171+
list_queue_messages(Config),
172+
30000)
173+
end).
174+
153175
%%----------------------------------------------------------------------------
154176
list_queue_messages(Config) ->
155-
lists:sort(
156-
rabbit_ct_broker_helpers:rabbitmqctl_list(
157-
Config, 0,
158-
["list_queues", "name", "messages", "messages_ready", "messages_unacknowledged", "--no-table-headers"])).
177+
[[N, to_int(M), to_int(MR), to_int(MU)]
178+
|| [N, M, MR, MU] <- lists:sort(
179+
rabbit_ct_broker_helpers:rabbitmqctl_list(
180+
Config, 0,
181+
["list_queues", "name", "messages", "messages_ready", "messages_unacknowledged", "--no-table-headers"]))].
182+
183+
to_int(<<>>) ->
184+
0;
185+
to_int(Int) ->
186+
binary_to_integer(Int).
159187

160188
with_session(Config, Fun) ->
161189
with_session(Config, <<"/">>, Fun).

0 commit comments

Comments
 (0)