@@ -92,7 +92,10 @@ groups() ->
9292 format ,
9393 add_member_2 ,
9494 single_active_consumer_priority_take_over ,
95- single_active_consumer_priority
95+ single_active_consumer_priority ,
96+ force_shrink_member_to_current_member ,
97+ force_all_queues_shrink_member_to_current_member ,
98+ force_vhost_queues_shrink_member_to_current_member
9699 ]
97100 ++ all_tests ()},
98101 {cluster_size_5 , [], [start_queue ,
@@ -1152,6 +1155,151 @@ single_active_consumer_priority(Config) ->
11521155 rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
11531156 ok .
11541157
1158+ force_shrink_member_to_current_member (Config ) ->
1159+ [Server0 , Server1 , Server2 ] =
1160+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1161+
1162+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1163+ QQ = ? config (queue_name , Config ),
1164+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1165+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1166+
1167+ RaName = ra_name (QQ ),
1168+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1169+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1170+
1171+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1172+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1173+ ? assertEqual (3 , length (Nodes0 )),
1174+
1175+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1176+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1177+
1178+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1179+
1180+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1181+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1182+ ? assertEqual (1 , length (Nodes1 )),
1183+
1184+ % % grow queues back to all nodes
1185+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1186+
1187+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1188+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1189+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1190+ ? assertEqual (3 , length (Nodes2 )).
1191+
1192+ force_all_queues_shrink_member_to_current_member (Config ) ->
1193+ [Server0 , Server1 , Server2 ] =
1194+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1195+
1196+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1197+ QQ = ? config (queue_name , Config ),
1198+ AQ = ? config (alt_queue_name , Config ),
1199+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1200+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1201+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1202+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1203+
1204+ QQs = [QQ , AQ ],
1205+
1206+ [begin
1207+ RaName = ra_name (Q ),
1208+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1209+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1210+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1211+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1212+ ? assertEqual (3 , length (Nodes0 ))
1213+ end || Q <- QQs ],
1214+
1215+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1216+ force_all_queues_shrink_member_to_current_member , []),
1217+
1218+ [begin
1219+ RaName = ra_name (Q ),
1220+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1221+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1222+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1223+ ? assertEqual (1 , length (Nodes0 ))
1224+ end || Q <- QQs ],
1225+
1226+ % % grow queues back to all nodes
1227+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1228+
1229+ [begin
1230+ RaName = ra_name (Q ),
1231+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1232+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1233+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1234+ ? assertEqual (3 , length (Nodes0 ))
1235+ end || Q <- QQs ].
1236+
1237+ force_vhost_queues_shrink_member_to_current_member (Config ) ->
1238+ [Server0 , Server1 , Server2 ] =
1239+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1240+
1241+ Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1242+ QQ = ? config (queue_name , Config ),
1243+ AQ = ? config (alt_queue_name , Config ),
1244+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1245+ declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1246+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1247+ declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1248+
1249+ QQs = [QQ , AQ ],
1250+
1251+ VHost1 = <<" /" >>,
1252+ VHost2 = <<" another-vhost" >>,
1253+ VHosts = [VHost1 , VHost2 ],
1254+
1255+ User = ? config (rmq_username , Config ),
1256+ ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1257+ ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1258+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1259+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1260+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1261+ declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1262+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1263+ declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1264+
1265+ [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
1266+
1267+ [begin
1268+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1269+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1270+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1271+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1272+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1273+ ? assertEqual (3 , length (Nodes0 ))
1274+ end || Q <- QQs , VHost <- VHosts ],
1275+
1276+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1277+ force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
1278+
1279+ [begin
1280+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1281+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1282+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1283+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1284+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1285+ case VHost of
1286+ VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1287+ VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1288+ end
1289+ end || Q <- QQs , VHost <- VHosts ],
1290+
1291+ % % grow queues back to all nodes in VHost2 only
1292+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1293+
1294+ [begin
1295+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1296+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1297+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1298+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1299+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1300+ ? assertEqual (3 , length (Nodes0 ))
1301+ end || Q <- QQs , VHost <- VHosts ].
1302+
11551303priority_queue_fifo (Config ) ->
11561304 % % testing: if hi priority messages are published before lo priority
11571305 % % messages they are always consumed first (fifo)
0 commit comments