Skip to content

Commit 03a2a07

Browse files
committed
wip
1 parent 05545af commit 03a2a07

File tree

1 file changed

+120
-1
lines changed

1 file changed

+120
-1
lines changed

deps/rabbit/test/clustering_recovery_SUITE.erl

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,14 @@ groups() ->
5454
recover_after_partition_with_leader,
5555
autodelete_transient_queue_after_partition_recovery_1,
5656
autodelete_durable_queue_after_partition_recovery_1,
57+
autodelete_transient_queue_after_partition_recovery_2,
58+
autodelete_durable_queue_after_partition_recovery_2,
5759
autodelete_transient_queue_after_node_loss,
5860
autodelete_durable_queue_after_node_loss,
5961
exclusive_transient_queue_after_partition_recovery_1,
6062
exclusive_durable_queue_after_partition_recovery_1,
63+
exclusive_transient_queue_after_partition_recovery_2,
64+
exclusive_durable_queue_after_partition_recovery_2,
6165
exclusive_transient_queue_after_node_loss,
6266
exclusive_durable_queue_after_node_loss
6367
]}
@@ -129,8 +133,12 @@ init_per_testcase(Testcase, Config) ->
129133
Config2 = case Testcase of
130134
_ when Testcase =:= autodelete_transient_queue_after_partition_recovery_1 orelse
131135
Testcase =:= autodelete_durable_queue_after_partition_recovery_1 orelse
136+
Testcase =:= autodelete_transient_queue_after_partition_recovery_2 orelse
137+
Testcase =:= autodelete_durable_queue_after_partition_recovery_2 orelse
132138
Testcase =:= exclusive_transient_queue_after_partition_recovery_1 orelse
133-
Testcase =:= exclusive_durable_queue_after_partition_recovery_1 ->
139+
Testcase =:= exclusive_durable_queue_after_partition_recovery_1 orelse
140+
Testcase =:= exclusive_transient_queue_after_partition_recovery_2 orelse
141+
Testcase =:= exclusive_durable_queue_after_partition_recovery_2 ->
134142
rabbit_ct_helpers:merge_app_env(
135143
Config1,
136144
{rabbit,
@@ -728,6 +736,117 @@ temporary_queue_after_partition_recovery_1(Config, QueueDeclare) ->
728736
ok
729737
end.
730738

739+
autodelete_transient_queue_after_partition_recovery_2(Config) ->
740+
QueueDeclare = #'queue.declare'{auto_delete = true,
741+
durable = false},
742+
temporary_queue_after_partition_recovery_2(Config, QueueDeclare).
743+
744+
autodelete_durable_queue_after_partition_recovery_2(Config) ->
745+
QueueDeclare = #'queue.declare'{auto_delete = true,
746+
durable = true},
747+
temporary_queue_after_partition_recovery_2(Config, QueueDeclare).
748+
749+
exclusive_transient_queue_after_partition_recovery_2(Config) ->
750+
QueueDeclare = #'queue.declare'{exclusive = true,
751+
durable = true},
752+
temporary_queue_after_partition_recovery_2(Config, QueueDeclare).
753+
754+
exclusive_durable_queue_after_partition_recovery_2(Config) ->
755+
QueueDeclare = #'queue.declare'{exclusive = true,
756+
durable = true},
757+
temporary_queue_after_partition_recovery_2(Config, QueueDeclare).
758+
759+
temporary_queue_after_partition_recovery_2(Config, QueueDeclare) ->
760+
[_Node1, Node2 | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(
761+
Config, nodename),
762+
Majority = Nodes -- [Node2],
763+
Timeout = 60000,
764+
765+
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(
766+
Config, Node2),
767+
CMRef = erlang:monitor(process, Conn),
768+
769+
%% We create an exclusive queue on node 1 and get its PID on the server
770+
%% side.
771+
?assertMatch(#'queue.declare_ok'{}, amqp_channel:call(Ch, QueueDeclare)),
772+
Queues = rabbit_ct_broker_helpers:rpc(
773+
Config, Node2, rabbit_amqqueue, list, []),
774+
?assertMatch([_], Queues),
775+
[Queue] = Queues,
776+
ct:pal("Queue = ~p", [Queue]),
777+
778+
QName = amqqueue:get_name(Queue),
779+
QPid = amqqueue:get_pid(Queue),
780+
QMRef = erlang:monitor(process, QPid),
781+
subscribe(Ch, QName#resource.name),
782+
783+
lists:foreach(
784+
fun(Node) ->
785+
rabbit_ct_broker_helpers:block_traffic_between(Node2, Node)
786+
end, Majority),
787+
clustering_utils:assert_cluster_status({Nodes, Majority}, Majority),
788+
clustering_utils:assert_cluster_status({Nodes, [Node2]}, [Node2]),
789+
790+
%% The queue is still recorded everywhere.
791+
lists:foreach(
792+
fun(Node) ->
793+
Ret = rabbit_ct_broker_helpers:rpc(
794+
Config, Node, rabbit_amqqueue, lookup, [QName]),
795+
ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]),
796+
?assertEqual({ok, Queue}, Ret)
797+
end, Nodes),
798+
799+
%% Publich to and consume from the queue.
800+
{_PConn, PCh} = rabbit_ct_client_helpers:open_connection_and_channel(
801+
Config, Node2),
802+
publish_many(PCh, QName#resource.name, 10),
803+
consume(10),
804+
805+
%% Close the consuming client to trigger the queue deletion during the
806+
%% network partition.
807+
_ = rabbit_ct_client_helpers:close_connection_and_channel(
808+
Conn, Ch),
809+
810+
%% We resolve the network partition.
811+
lists:foreach(
812+
fun(Node) ->
813+
rabbit_ct_broker_helpers:allow_traffic_between(
814+
Node2, Node)
815+
end, Majority),
816+
clustering_utils:assert_cluster_status({Nodes, Nodes}, Nodes),
817+
818+
receive
819+
{'DOWN', CMRef, _, _, Reason1} ->
820+
ct:pal("Connection ~p exited: ~p", [Conn, Reason1]),
821+
?assertEqual({shutdown, normal}, Reason1),
822+
ok
823+
after Timeout ->
824+
ct:fail("Connection ~p still running", [Conn])
825+
end,
826+
receive
827+
{'DOWN', QMRef, _, _, Reason} ->
828+
ct:pal("Queue ~p exited: ~p", [QPid, Reason]),
829+
?assertEqual(normal, Reason),
830+
ok
831+
after Timeout ->
832+
ct:fail("Queue ~p still running", [QPid])
833+
end,
834+
835+
%% The queue was also deleted from the metadata store on all
836+
%% nodes.
837+
lists:foreach(
838+
fun(Node) ->
839+
?awaitMatch(
840+
{error, not_found},
841+
begin
842+
Ret = rabbit_ct_broker_helpers:rpc(
843+
Config, Node, rabbit_amqqueue, lookup, [QName]),
844+
ct:pal("Queue lookup on node ~0p: ~p", [Node, Ret]),
845+
Ret
846+
end, Timeout)
847+
end, Nodes),
848+
ok.
849+
731850
autodelete_transient_queue_after_node_loss(Config) ->
732851
QueueDeclare = #'queue.declare'{auto_delete = true,
733852
durable = false},

0 commit comments

Comments
 (0)