From f0776c8b972dd271ab36825d8e364dc2f0e10829 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 24 Jun 2025 12:09:21 +0200 Subject: [PATCH] Add log statements stream network partitions The test creates network partitions and checks how the stream SAC coordinator deals with them. It can be flaky on CI, the log statements should help diagnose the flakiness. (cherry picked from commit 066145763f0e3ae425b5ceb62c2f20706db65c4b) --- .../test/rabbit_stream_partitions_SUITE.erl | 91 +++++++++++++++++-- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl index 6f12bbeed027..956bd899f2df 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_partitions_SUITE.erl @@ -12,7 +12,7 @@ %% %% The Initial Developer of the Original Code is Pivotal Software, Inc. %% Copyright (c) 2025 Broadcom. All Rights Reserved. -%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_stream_partitions_SUITE). @@ -107,6 +107,8 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> %% another node will be isolated ?assertEqual(L#node.name, coordinator_leader(Config)), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(Config, 0), {ok, So1, C1_00} = stream_test_utils:connect(Config, 1), {ok, So2, C2_00} = stream_test_utils:connect(Config, 2), @@ -135,18 +137,24 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N), wait_for_disconnected_consumer(Config, LN, S), wait_for_presumed_down_consumer(Config, LN, S), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N), wait_for_all_consumers_connected(Config, LN, S), Consumers2 = query_consumers(Config, LN, S), + log("Consumers after partition resolution: ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted assertSize(2, Consumers2), @@ -157,21 +165,28 @@ simple_sac_consumer_should_get_disconnected_on_network_partition(Config) -> %% assert the cancelled consumer received a metadata update frame SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting stream"), delete_stream(stream_port(Config, 0), S), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue the this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> - {_, C1} = receive_commands(S0, C0), + log("Expecting frame in consumer ~p", [K]), + {Cmd1, C1} = receive_commands(S0, C0), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), @@ -190,6 +205,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(Config, CL), {ok, So1, C1_00} = stream_test_utils:connect(Config, CF1), {ok, So2, C2_00} = stream_test_utils:connect(Config, CF2), @@ -216,12 +233,16 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF1), rabbit_ct_broker_helpers:block_traffic_between(Isolated, CF2), wait_for_disconnected_consumer(Config, NotIsolated, S), wait_for_presumed_down_consumer(Config, NotIsolated, S), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF1), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, CF2), @@ -231,6 +252,8 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co Consumers2 = query_consumers(Config, NotIsolated, S), + log("Consumers after partition resolution ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted assertSize(2, Consumers2), @@ -246,26 +269,35 @@ simple_sac_consumer_should_get_disconnected_on_coord_leader_network_partition(Co SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), %% cancelled consumer received a metadata update C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) when K == ActiveSubId -> + log("Expecting consumer update for promoted consumer"), %% promoted consumer should have received consumer update C1 = receive_consumer_update_and_respond(S0, C0), + log("Received consumer update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting stream"), delete_stream(L#node.stream_port, S), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> - {_, C1} = receive_commands(S0, C0), + log("Expecting frame in consumer ~p", [K]), + {Cmd1, C1} = receive_commands(S0, C0), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), @@ -286,6 +318,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - %% another node will be isolated ?assertEqual(L#node.name, CL), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port), {ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port), {ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port), @@ -315,12 +349,16 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N), wait_for_disconnected_consumer(Config, NotIsolated, Partition), wait_for_presumed_down_consumer(Config, NotIsolated, Partition), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N), @@ -329,6 +367,8 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - wait_for_all_consumers_connected(Config, NotIsolated, Partition), Consumers2 = query_consumers(Config, NotIsolated, Partition), + log("Consumers after partition resolution: ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted @@ -340,22 +380,29 @@ super_stream_sac_consumer_should_get_disconnected_on_network_partition(Config) - SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), %% cancelled consumer received a metadata update C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting super stream"), delete_super_stream(L#node.stream_port, Ss), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> - {_, C1} = receive_commands(S0, C0), + log("Expecting frame in consumer ~p", [K]), + {Cmd1, C1} = receive_commands(S0, C0), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), ok. @@ -374,6 +421,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit %% the coordinator leader node will be isolated ?assertNotEqual(L#node.name, CL), + log("Stream leader and coordinator leader are on ~p", [L#node.name]), + {ok, So0, C0_00} = stream_test_utils:connect(L#node.stream_port), {ok, So1, C1_00} = stream_test_utils:connect(F1#node.stream_port), {ok, So2, C2_00} = stream_test_utils:connect(F2#node.stream_port), @@ -410,12 +459,16 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit end, Consumers1), #consumer{subscription_id = DiscSubId} = DisconnectedConsumer, + log("Isolating node ~p", [Isolated]), + rabbit_ct_broker_helpers:block_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:block_traffic_between(Isolated, F2N), wait_for_disconnected_consumer(Config, NotIsolated, Partition), wait_for_presumed_down_consumer(Config, NotIsolated, Partition), + log("Node ~p rejoins cluster", [Isolated]), + rabbit_ct_broker_helpers:allow_traffic_between(Isolated, LN), rabbit_ct_broker_helpers:allow_traffic_between(Isolated, F2N), @@ -424,6 +477,8 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit wait_for_all_consumers_connected(Config, NotIsolated, Partition), Consumers2 = query_consumers(Config, NotIsolated, Partition), + log("Consumers after partition resolution: ~p", [Consumers2]), + log("Disconnected consumer: ~p", [DisconnectedConsumer]), %% the disconnected, then presumed down consumer is cancelled, %% because the stream member on its node has been restarted @@ -440,27 +495,35 @@ super_stream_sac_consumer_should_get_disconnected_on_coord_leader_network_partit SubIdToState1 = maps:fold(fun(K, {S0, C0}, Acc) when K == DiscSubId -> + log("Expecting metadata update for disconnected consumer"), %% cancelled consumer received a metadata update C1 = receive_metadata_update(S0, C0), + log("Received metadata update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) when K == ActiveSubId -> + log("Expecting consumer update for promoted consumer"), %% promoted consumer should have received consumer update C1 = receive_consumer_update_and_respond(S0, C0), + log("Received consumer update"), Acc#{K => {S0, C1}}; (K, {S0, C0}, Acc) -> Acc#{K => {S0, C0}} end, #{}, SubIdToState0), + log("Deleting super stream"), delete_super_stream(L#node.stream_port, Ss), %% online consumers should receive a metadata update frame (stream deleted) %% we unqueue this frame before closing the connection %% directly closing the connection of the cancelled consumer maps:foreach(fun(K, {S0, C0}) when K /= DiscSubId -> + log("Expecting frame in consumer ~p", [K]), {Cmd1, C1} = receive_commands(S0, C0), - ct:pal("Received command: ~p", [Cmd1]), + log("Received ~p", [Cmd1]), + log("Closing"), {ok, _} = stream_test_utils:close(S0, C1); - (_, {S0, C0}) -> + (K, {S0, C0}) -> + log("Closing ~p", [K]), {ok, _} = stream_test_utils:close(S0, C0) end, SubIdToState1), ok. @@ -727,6 +790,7 @@ wait_for_disconnected_consumer(Config, Node, Stream) -> rabbit_ct_helpers:await_condition( fun() -> Cs = query_consumers(Config, Node, Stream), + log("Expecting a disconnected consumer: ~p", [Cs]), lists:any(fun(#consumer{status = {disconnected, _}}) -> true; (_) -> @@ -738,6 +802,7 @@ wait_for_presumed_down_consumer(Config, Node, Stream) -> rabbit_ct_helpers:await_condition( fun() -> Cs = query_consumers(Config, Node, Stream), + log("Expecting a presumed-down consumer: ~p", [Cs]), lists:any(fun(#consumer{status = {presumed_down, _}}) -> true; (_) -> @@ -749,6 +814,7 @@ wait_for_all_consumers_connected(Config, Node, Stream) -> rabbit_ct_helpers:await_condition( fun() -> Cs = query_consumers(Config, Node, Stream), + log("Expecting connected consumers: ~p", [Cs]), lists:all(fun(#consumer{status = {connected, _}}) -> true; (_) -> @@ -761,6 +827,7 @@ wait_for_coordinator_ready(Config) -> rabbit_ct_helpers:await_condition( fun() -> Status = coordinator_status(Config), + log("Coordinator status: ~p", [Status]), lists:all(fun(St) -> RS = proplists:get_value(<<"Raft State">>, St, undefined), @@ -785,3 +852,9 @@ assertSize(Expected, List) when is_list(List) -> assertEmpty(Data) -> assertSize(0, Data). + +log(Format) -> + ct:pal(Format). + +log(Format, Args) -> + ct:pal(Format, Args).