@@ -762,23 +762,32 @@ temporary_queue_after_partition_recovery_2(Config, QueueDeclare) ->
762762 Majority = Nodes -- [Node2 ],
763763 Timeout = 60000 ,
764764
765- {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (
766- Config , Node2 ),
767- CMRef = erlang :monitor (process , Conn ),
765+ {Conn1 , Ch1 } = rabbit_ct_client_helpers :open_connection_and_channel (
766+ Config , Node2 ),
767+ CMRef1 = erlang :monitor (process , Conn1 ),
768+ {Conn2 , Ch2 } = rabbit_ct_client_helpers :open_connection_and_channel (
769+ Config , Node2 ),
770+ CMRef2 = erlang :monitor (process , Conn2 ),
768771
769772 % % We create an exclusive queue on node 1 and get its PID on the server
770773 % % side.
771- ? assertMatch (# 'queue.declare_ok' {}, amqp_channel :call (Ch , QueueDeclare )),
774+ ? assertMatch (# 'queue.declare_ok' {}, amqp_channel :call (Ch1 , QueueDeclare )),
775+ ? assertMatch (# 'queue.declare_ok' {}, amqp_channel :call (Ch2 , QueueDeclare )),
772776 Queues = rabbit_ct_broker_helpers :rpc (
773777 Config , Node2 , rabbit_amqqueue , list , []),
774- ? assertMatch ([_ ], Queues ),
775- [Queue ] = Queues ,
776- ct :pal (" Queue = ~p " , [Queue ]),
778+ ? assertMatch ([_ , _ ], Queues ),
779+ [Queue1 , Queue2 ] = Queues ,
780+ ct :pal (" Queues = ~p " , [Queues ]),
777781
778- QName = amqqueue :get_name (Queue ),
779- QPid = amqqueue :get_pid (Queue ),
780- QMRef = erlang :monitor (process , QPid ),
781- subscribe (Ch , QName # resource .name ),
782+ QName1 = amqqueue :get_name (Queue1 ),
783+ QPid1 = amqqueue :get_pid (Queue1 ),
784+ QMRef1 = erlang :monitor (process , QPid1 ),
785+ subscribe (Ch1 , QName1 # resource .name ),
786+
787+ QName2 = amqqueue :get_name (Queue2 ),
788+ QPid2 = amqqueue :get_pid (Queue2 ),
789+ QMRef2 = erlang :monitor (process , QPid2 ),
790+ subscribe (Ch2 , QName2 # resource .name ),
782791
783792 lists :foreach (
784793 fun (Node ) ->
@@ -787,61 +796,105 @@ temporary_queue_after_partition_recovery_2(Config, QueueDeclare) ->
787796 clustering_utils :assert_cluster_status ({Nodes , Majority }, Majority ),
788797 clustering_utils :assert_cluster_status ({Nodes , [Node2 ]}, [Node2 ]),
789798
790- % % The queue is still recorded everywhere.
799+ % % The queues are still recorded everywhere.
791800 lists :foreach (
792801 fun (Node ) ->
793- Ret = rabbit_ct_broker_helpers :rpc (
794- Config , Node , rabbit_amqqueue , lookup , [QName ]),
795- ct :pal (" Queue lookup on node ~0p : ~p " , [Node , Ret ]),
796- ? assertEqual ({ok , Queue }, Ret )
802+ Ret1 = rabbit_ct_broker_helpers :rpc (
803+ Config , Node , rabbit_amqqueue , lookup , [QName1 ]),
804+ Ret2 = rabbit_ct_broker_helpers :rpc (
805+ Config , Node , rabbit_amqqueue , lookup , [QName2 ]),
806+ ct :pal (
807+ " Queues lookup on node ~0p :~n ~p~n~p " ,
808+ [Node , Ret1 , Ret2 ]),
809+ ? assertEqual ({ok , Queue1 }, Ret1 ),
810+ ? assertEqual ({ok , Queue2 }, Ret2 )
797811 end , Nodes ),
798812
799813 % % Publich to and consume from the queue.
800814 {_PConn , PCh } = rabbit_ct_client_helpers :open_connection_and_channel (
801815 Config , Node2 ),
802- publish_many (PCh , QName # resource .name , 10 ),
803- consume (10 ),
816+ publish_many (PCh , QName1 # resource .name , 10 ),
817+ publish_many (PCh , QName2 # resource .name , 10 ),
818+ consume (20 ),
819+
820+ % % Close the first consuming client to trigger the queue deletion during
821+ % % the network partition. Because of the network partition, the queue
822+ % % process exits but it couldn't delete the queue record.
823+ _ = rabbit_ct_client_helpers :close_connection_and_channel (
824+ Conn1 , Ch1 ),
825+
826+ KhepriTimeout = rabbit_ct_broker_helpers :rpc (Config , Node2 , khepri_app , get_default_timeout , []),
827+ timer :sleep (KhepriTimeout + 10000 ),
804828
805- % % Close the consuming client to trigger the queue deletion during the
806- % % network partition. Because of the network partition, the queue process
807- % % exits but it couldn't delete the queue record.
829+ % % Close the second consuming client to trigger the queue deletion during
830+ % % the network partition. This time, the partition is solved while the
831+ % % queue process tries to delete the record.
808832 _ = rabbit_ct_client_helpers :close_connection_and_channel (
809- Conn , Ch ),
833+ Conn2 , Ch2 ),
834+
835+ % % We resolve the network partition.
836+ lists :foreach (
837+ fun (Node ) ->
838+ rabbit_ct_broker_helpers :allow_traffic_between (
839+ Node2 , Node )
840+ end , Majority ),
841+ clustering_utils :assert_cluster_status ({Nodes , Nodes }, Nodes ),
810842
811843 receive
812- {'DOWN' , CMRef , _ , _ , Reason1 } ->
813- ct :pal (" Connection ~p exited: ~p " , [Conn , Reason1 ]),
814- ? assertEqual ({shutdown , normal }, Reason1 ),
844+ {'DOWN' , CMRef1 , _ , _ , Reason1_1 } ->
845+ ct :pal (" Connection ~p exited: ~p " , [Conn1 , Reason1_1 ]),
846+ ? assertEqual ({shutdown , normal }, Reason1_1 ),
815847 ok
816848 after Timeout ->
817- ct :fail (" Connection ~p still running" , [Conn ])
849+ ct :fail (" Connection ~p still running" , [Conn1 ])
818850 end ,
819851 receive
820- {'DOWN' , QMRef , _ , _ , Reason } ->
821- ct :pal (" Queue ~p exited: ~p " , [QPid , Reason ]),
822- ? assertEqual (normal , Reason ),
852+ {'DOWN' , QMRef1 , _ , _ , Reason1_2 } ->
853+ ct :pal (" Queue ~p exited: ~p " , [QPid1 , Reason1_2 ]),
854+ ? assertEqual (normal , Reason1_2 ),
823855 ok
824856 after Timeout ->
825- ct :fail (" Queue ~p still running" , [QPid ])
857+ ct :fail (" Queue ~p still running" , [QPid1 ])
826858 end ,
827859
828- % % We resolve the network partition .
860+ % % The first queue was deleted from the metadata store on all nodes .
829861 lists :foreach (
830862 fun (Node ) ->
831- rabbit_ct_broker_helpers :allow_traffic_between (
832- Node2 , Node )
833- end , Majority ),
834- clustering_utils :assert_cluster_status ({Nodes , Nodes }, Nodes ),
863+ ? awaitMatch (
864+ {error , not_found },
865+ begin
866+ Ret = rabbit_ct_broker_helpers :rpc (
867+ Config , Node , rabbit_amqqueue , lookup , [QName1 ]),
868+ ct :pal (" Queue lookup on node ~0p : ~p " , [Node , Ret ]),
869+ Ret
870+ end , Timeout )
871+ end , Nodes ),
872+
873+ receive
874+ {'DOWN' , CMRef2 , _ , _ , Reason2_1 } ->
875+ ct :pal (" Connection ~p exited: ~p " , [Conn2 , Reason2_1 ]),
876+ ? assertEqual ({shutdown , normal }, Reason2_1 ),
877+ ok
878+ after Timeout ->
879+ ct :fail (" Connection ~p still running" , [Conn2 ])
880+ end ,
881+ receive
882+ {'DOWN' , QMRef2 , _ , _ , Reason2_2 } ->
883+ ct :pal (" Queue ~p exited: ~p " , [QPid2 , Reason2_2 ]),
884+ ? assertEqual (normal , Reason2_2 ),
885+ ok
886+ after Timeout ->
887+ ct :fail (" Queue ~p still running" , [QPid2 ])
888+ end ,
835889
836- % % The queue was also deleted from the metadata store on all
837- % % nodes.
890+ % % The second queue was deleted from the metadata store on all nodes.
838891 lists :foreach (
839892 fun (Node ) ->
840893 ? awaitMatch (
841894 {error , not_found },
842895 begin
843896 Ret = rabbit_ct_broker_helpers :rpc (
844- Config , Node , rabbit_amqqueue , lookup , [QName ]),
897+ Config , Node , rabbit_amqqueue , lookup , [QName2 ]),
845898 ct :pal (" Queue lookup on node ~0p : ~p " , [Node , Ret ]),
846899 Ret
847900 end , Timeout )
0 commit comments