Skip to content

Commit 0061944

Browse files
committed
Cancel AMQP stream consumer when local stream member is deleted
The consumer reader process is gone and there is no way to recover it as the node does not have a member of the stream anymore, so it should be cancelled/detached.
1 parent 7697f76 commit 0061944

File tree

4 files changed

+52
-2
lines changed

4 files changed

+52
-2
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ rabbitmq_integration_suite(
832832
additional_beam = [
833833
":test_queue_utils_beam",
834834
],
835-
shard_count = 19,
835+
shard_count = 20,
836836
deps = [
837837
"@proper//:erlang_app",
838838
],

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1747,6 +1747,12 @@ eval_listener({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0},
17471747
{queue_event, QRef,
17481748
{stream_local_member_change, MemberPid}},
17491749
cast} | Efs]};
1750+
(_MNode, #member{state = {running, _, MemberPid},
1751+
role = {replica, _},
1752+
target = deleted}, {_, Efs}) ->
1753+
{MemberPid, [{send_msg, P,
1754+
{queue_event, QRef, deleted_replica},
1755+
cast} | Efs]};
17501756
(_N, _M, Acc) ->
17511757
%% not a replica, nothing to do
17521758
Acc

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,9 @@ handle_event(_QName, {stream_local_member_change, Pid},
626626
end, #{}, Readers0),
627627
{ok, State#stream_client{local_pid = Pid, readers = Readers1}, []};
628628
handle_event(_QName, eol, #stream_client{name = Name}) ->
629-
{eol, [{unblock, Name}]}.
629+
{eol, [{unblock, Name}]};
630+
handle_event(QName, deleted_replica, State) ->
631+
{ok, State, [{queue_down, QName}]}.
630632

631633
is_recoverable(Q) ->
632634
Node = node(),

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ all() ->
3434
{group, cluster_size_3},
3535
{group, cluster_size_3_1},
3636
{group, cluster_size_3_2},
37+
{group, cluster_size_3_3},
3738
{group, cluster_size_3_parallel_1},
3839
{group, cluster_size_3_parallel_2},
3940
{group, cluster_size_3_parallel_3},
@@ -79,6 +80,7 @@ groups() ->
7980
{cluster_size_3_2, [], [recover,
8081
declare_with_node_down_1,
8182
declare_with_node_down_2]},
83+
{cluster_size_3_3, [], [consume_while_deleting_replica]},
8284
{cluster_size_3_parallel_1, [parallel], [
8385
delete_replica,
8486
delete_last_replica,
@@ -207,6 +209,7 @@ init_per_group1(Group, Config) ->
207209
cluster_size_3_parallel_5 -> 3;
208210
cluster_size_3_1 -> 3;
209211
cluster_size_3_2 -> 3;
212+
cluster_size_3_3 -> 3;
210213
unclustered_size_3_1 -> 3;
211214
unclustered_size_3_2 -> 3;
212215
unclustered_size_3_3 -> 3;
@@ -1649,6 +1652,45 @@ consume_from_replica(Config) ->
16491652
receive_batch(Ch2, 0, 99),
16501653
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
16511654

1655+
consume_while_deleting_replica(Config) ->
1656+
[Server1, _, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
1657+
1658+
Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1),
1659+
Q = ?config(queue_name, Config),
1660+
1661+
?assertEqual({'queue.declare_ok', Q, 0, 0},
1662+
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
1663+
1664+
rabbit_ct_helpers:await_condition(
1665+
fun () ->
1666+
Info = find_queue_info(Config, 1, [online]),
1667+
length(proplists:get_value(online, Info)) == 3
1668+
end),
1669+
1670+
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server3),
1671+
qos(Ch2, 10, false),
1672+
1673+
CTag = atom_to_binary(?FUNCTION_NAME),
1674+
subscribe(Ch2, Q, false, 0, CTag),
1675+
1676+
%% Delete replica in node 3
1677+
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_queue,
1678+
delete_replica, [<<"/">>, Q, Server3]),
1679+
1680+
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
1681+
1682+
%% no messages should be received
1683+
receive
1684+
#'basic.cancel'{consumer_tag = CTag} ->
1685+
ok;
1686+
{_, #amqp_msg{}} ->
1687+
exit(unexpected_message)
1688+
after 30000 ->
1689+
exit(missing_consumer_cancel)
1690+
end,
1691+
1692+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
1693+
16521694
consume_credit(Config) ->
16531695
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
16541696

0 commit comments

Comments
 (0)