Skip to content

Commit f19d84b

Browse files
authored
Merge pull request #12195 from rabbitmq/mergify/bp/v4.0.x/pr-12074
Cancel AMQP stream consumer when local stream member is deleted (backport #12074)
2 parents 622e7cf + 66d1294 commit f19d84b

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ rabbitmq_integration_suite(
823823
additional_beam = [
824824
":test_queue_utils_beam",
825825
],
826-
shard_count = 19,
826+
shard_count = 20,
827827
deps = [
828828
"@proper//:erlang_app",
829829
],

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,6 +1747,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0},
17471747
{queue_event, QRef,
17481748
{stream_local_member_change, MemberPid}},
17491749
cast} | Efs]};
1750+
(_MNode, #member{state = {running, _, MemberPid},
1751+
role = {replica, _},
1752+
target = deleted}, {_, Efs}) ->
1753+
{MemberPid, [{send_msg, P,
1754+
{queue_event, QRef, deleted_replica},
1755+
cast} | Efs]};
17501756
(_N, _M, Acc) ->
17511757
%% not a replica, nothing to do
17521758
Acc

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,9 @@ handle_event(_QName, {stream_local_member_change, Pid},
626626
end, #{}, Readers0),
627627
{ok, State#stream_client{local_pid = Pid, readers = Readers1}, []};
628628
handle_event(_QName, eol, #stream_client{name = Name}) ->
629-
{eol, [{unblock, Name}]}.
629+
{eol, [{unblock, Name}]};
630+
handle_event(QName, deleted_replica, State) ->
631+
{ok, State, [{queue_down, QName}]}.
630632

631633
is_recoverable(Q) ->
632634
Node = node(),

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ all() ->
3434
{group, cluster_size_3},
3535
{group, cluster_size_3_1},
3636
{group, cluster_size_3_2},
37+
{group, cluster_size_3_3},
3738
{group, cluster_size_3_parallel_1},
3839
{group, cluster_size_3_parallel_2},
3940
{group, cluster_size_3_parallel_3},
@@ -79,6 +80,7 @@ groups() ->
7980
{cluster_size_3_2, [], [recover,
8081
declare_with_node_down_1,
8182
declare_with_node_down_2]},
83+
{cluster_size_3_3, [], [consume_while_deleting_replica]},
8284
{cluster_size_3_parallel_1, [parallel], [
8385
delete_replica,
8486
delete_last_replica,
@@ -207,6 +209,7 @@ init_per_group1(Group, Config) ->
207209
cluster_size_3_parallel_5 -> 3;
208210
cluster_size_3_1 -> 3;
209211
cluster_size_3_2 -> 3;
212+
cluster_size_3_3 -> 3;
210213
unclustered_size_3_1 -> 3;
211214
unclustered_size_3_2 -> 3;
212215
unclustered_size_3_3 -> 3;
@@ -1649,6 +1652,45 @@ consume_from_replica(Config) ->
16491652
receive_batch(Ch2, 0, 99),
16501653
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
16511654

1655+
consume_while_deleting_replica(Config) ->
1656+
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1657+
1658+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1659+
Q = ?config(queue_name, Config),
1660+
1661+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1662+
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1663+
1664+
rabbit_ct_helpers:await_condition(
1665+
fun () ->
1666+
Info = find_queue_info(Config, 1, [online]),
1667+
length(proplists:get_value(online, Info)) == 3
1668+
end),
1669+
1670+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
1671+
qos(Ch2, 10, false),
1672+
1673+
CTag = atom_to_binary(?FUNCTION_NAME),
1674+
subscribe(Ch2, Q, false, 0, CTag),
1675+
1676+
%% Delete replica in node 3
1677+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue,
1678+
delete_replica, [<<"/">>, Q, Server3]),
1679+
1680+
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
1681+
1682+
%% no messages should be received
1683+
receive
1684+
#'basic.cancel'{consumer_tag = CTag} ->
1685+
ok;
1686+
{_, #amqp_msg{}} ->
1687+
exit(unexpected_message)
1688+
after 30000 ->
1689+
exit(missing_consumer_cancel)
1690+
end,
1691+
1692+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
1693+
16521694
consume_credit(Config) ->
16531695
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
16541696

0 commit comments

Comments
 (0)