@@ -1286,155 +1286,167 @@ single_active_consumer_priority(Config) ->
12861286
12871287force_shrink_member_to_current_member (Config ) ->
12881288 case rabbit_ct_helpers :is_mixed_versions () of
1289- true ->
1290- {skip , " Should not run in mixed version environments" };
1291- _ ->
1292- [Server0 , Server1 , Server2 ] =
1289+ true ->
1290+ {skip , " Should not run in mixed version environments" };
1291+ _ ->
1292+ [Server0 , Server1 , Server2 ] =
12931293 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
12941294
1295- Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1296- QQ = ? config (queue_name , Config ),
1297- ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1298- declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1295+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1296+ QQ = ? config (queue_name , Config ),
1297+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1298+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
12991299
1300- RaName = ra_name (QQ ),
1301- rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1302- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1300+ RaName = ra_name (QQ ),
1301+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1302+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
13031303
1304- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1305- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1306- ? assertEqual (3 , length (Nodes0 )),
1304+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1305+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1306+ ? assertEqual (3 , length (Nodes0 )),
13071307
1308- rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1309- force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1308+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1309+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
13101310
1311- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1311+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
13121312
1313- {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1314- #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1315- ? assertEqual (1 , length (Nodes1 )),
1313+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1314+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1315+ ? assertEqual (1 , length (Nodes1 )),
13161316
1317- % % grow queues back to all nodes
1318- [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1317+ % % grow queues back to all nodes
1318+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
13191319
1320- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1321- {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1322- #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1323- ? assertEqual (3 , length (Nodes2 ))
1320+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1321+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1322+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1323+ ? assertEqual (3 , length (Nodes2 ))
13241324 end .
13251325
13261326force_all_queues_shrink_member_to_current_member (Config ) ->
1327- [Server0 , Server1 , Server2 ] =
1328- rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1327+ case rabbit_ct_helpers :is_mixed_versions () of
1328+ true ->
1329+ {skip , " Should not run in mixed version environments" };
1330+ _ ->
1331+ [Server0 , Server1 , Server2 ] =
1332+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
13291333
1330- Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1331- QQ = ? config (queue_name , Config ),
1332- AQ = ? config (alt_queue_name , Config ),
1333- ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1334- declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1335- ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1336- declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1334+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1335+ QQ = ? config (queue_name , Config ),
1336+ AQ = ? config (alt_queue_name , Config ),
1337+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1338+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1339+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1340+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
13371341
1338- QQs = [QQ , AQ ],
1342+ QQs = [QQ , AQ ],
13391343
1340- [begin
1341- RaName = ra_name (Q ),
1342- rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1343- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1344- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1345- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1346- ? assertEqual (3 , length (Nodes0 ))
1347- end || Q <- QQs ],
1344+ [begin
1345+ RaName = ra_name (Q ),
1346+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1347+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1348+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1349+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1350+ ? assertEqual (3 , length (Nodes0 ))
1351+ end || Q <- QQs ],
13481352
1349- rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1350- force_all_queues_shrink_member_to_current_member , []),
1353+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1354+ force_all_queues_shrink_member_to_current_member , []),
13511355
1352- [begin
1353- RaName = ra_name (Q ),
1354- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1355- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1356- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1357- ? assertEqual (1 , length (Nodes0 ))
1358- end || Q <- QQs ],
1356+ [begin
1357+ RaName = ra_name (Q ),
1358+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1359+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1360+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1361+ ? assertEqual (1 , length (Nodes0 ))
1362+ end || Q <- QQs ],
13591363
1360- % % grow queues back to all nodes
1361- [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1364+ % % grow queues back to all nodes
1365+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
13621366
1363- [begin
1364- RaName = ra_name (Q ),
1365- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1366- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1367- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1368- ? assertEqual (3 , length (Nodes0 ))
1369- end || Q <- QQs ].
1367+ [begin
1368+ RaName = ra_name (Q ),
1369+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1370+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1371+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1372+ ? assertEqual (3 , length (Nodes0 ))
1373+ end || Q <- QQs ]
1374+ end .
13701375
13711376force_vhost_queues_shrink_member_to_current_member (Config ) ->
1372- [Server0 , Server1 , Server2 ] =
1373- rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1377+ case rabbit_ct_helpers :is_mixed_versions () of
1378+ true ->
1379+ {skip , " Should not run in mixed version environments" };
1380+ _ ->
1381+ [Server0 , Server1 , Server2 ] =
1382+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
13741383
1375- Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1376- QQ = ? config (queue_name , Config ),
1377- AQ = ? config (alt_queue_name , Config ),
1378- ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1379- declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1380- ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1381- declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1384+ Ch0 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1385+ QQ = ? config (queue_name , Config ),
1386+ AQ = ? config (alt_queue_name , Config ),
1387+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1388+ declare (Ch0 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1389+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1390+ declare (Ch0 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
13821391
1383- QQs = [QQ , AQ ],
1392+ QQs = [QQ , AQ ],
13841393
1385- VHost1 = <<" /" >>,
1386- VHost2 = <<" another-vhost" >>,
1387- VHosts = [VHost1 , VHost2 ],
1394+ VHost1 = <<" /" >>,
1395+ VHost2 = <<" another-vhost" >>,
1396+ VHosts = [VHost1 , VHost2 ],
13881397
1389- User = ? config (rmq_username , Config ),
1390- ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1391- ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1392- Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1393- {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1394- ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1395- declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1396- ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1397- declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1398+ User = ? config (rmq_username , Config ),
1399+ ok = rabbit_ct_broker_helpers :add_vhost (Config , Server0 , VHost2 , User ),
1400+ ok = rabbit_ct_broker_helpers :set_full_permissions (Config , User , VHost2 ),
1401+ Conn1 = rabbit_ct_client_helpers :open_unmanaged_connection (Config , Server0 , VHost2 ),
1402+ {ok , Ch1 } = amqp_connection :open_channel (Conn1 ),
1403+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1404+ declare (Ch1 , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1405+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1406+ declare (Ch1 , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
13981407
1399- [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
1408+ [rabbit_ct_client_helpers :publish (Ch , Q , 3 ) || Q <- QQs , Ch <- [Ch0 , Ch1 ]],
14001409
1401- [begin
1402- QQRes = rabbit_misc :r (VHost , queue , Q ),
1403- {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1404- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1405- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1406- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1407- ? assertEqual (3 , length (Nodes0 ))
1408- end || Q <- QQs , VHost <- VHosts ],
1410+ [begin
1411+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1412+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1413+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1414+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1415+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1416+ ? assertEqual (3 , length (Nodes0 ))
1417+ end || Q <- QQs , VHost <- VHosts ],
14091418
1410- rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1411- force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
1419+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1420+ force_vhost_queues_shrink_member_to_current_member , [VHost2 ]),
14121421
1413- [begin
1414- QQRes = rabbit_misc :r (VHost , queue , Q ),
1415- {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1416- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1417- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1418- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1419- case VHost of
1420- VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1421- VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1422- end
1423- end || Q <- QQs , VHost <- VHosts ],
1422+ [begin
1423+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1424+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1425+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1426+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1427+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1428+ case VHost of
1429+ VHost1 -> ? assertEqual (3 , length (Nodes0 ));
1430+ VHost2 -> ? assertEqual (1 , length (Nodes0 ))
1431+ end
1432+ end || Q <- QQs , VHost <- VHosts ],
14241433
1425- % % grow queues back to all nodes in VHost2 only
1426- [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1434+ % % grow queues back to all nodes in VHost2 only
1435+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , VHost2 , <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
14271436
1428- [begin
1429- QQRes = rabbit_misc :r (VHost , queue , Q ),
1430- {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1431- wait_for_messages_ready ([Server0 ], RaName , 3 ),
1432- {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1433- #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1434- ? assertEqual (3 , length (Nodes0 ))
1435- end || Q <- QQs , VHost <- VHosts ].
1437+ [begin
1438+ QQRes = rabbit_misc :r (VHost , queue , Q ),
1439+ {ok , RaName } = rpc :call (Server0 , rabbit_queue_type_util , qname_to_internal_name , [QQRes ]),
1440+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1441+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , VHost ]),
1442+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1443+ ? assertEqual (3 , length (Nodes0 ))
1444+ end || Q <- QQs , VHost <- VHosts ]
1445+ end .
14361446
14371447force_checkpoint_on_queue (Config ) ->
1448+ check_quorum_queues_v4_compat (Config ),
1449+
14381450 [Server0 , Server1 , Server2 ] =
14391451 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
14401452 Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
@@ -1501,6 +1513,8 @@ force_checkpoint_on_queue(Config) ->
15011513 end ).
15021514
15031515force_checkpoint (Config ) ->
1516+ check_quorum_queues_v4_compat (Config ),
1517+
15041518 [Server0 , _Server1 , _Server2 ] =
15051519 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
15061520 Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
@@ -1722,6 +1736,7 @@ subscribe_from_each(Config) ->
17221736 ok .
17231737
17241738dont_leak_file_handles (Config ) ->
1739+ check_quorum_queues_v4_compat (Config ),
17251740
17261741 [Server0 | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
17271742
@@ -1770,6 +1785,8 @@ dont_leak_file_handles(Config) ->
17701785 ok .
17711786
17721787gh_12635 (Config ) ->
1788+ check_quorum_queues_v4_compat (Config ),
1789+
17731790 % https://github.com/rabbitmq/rabbitmq-server/issues/12635
17741791 [Server0 , _Server1 , Server2 ] =
17751792 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
@@ -3268,6 +3285,8 @@ subscribe_redelivery_limit(Config) ->
32683285 end .
32693286
32703287subscribe_redelivery_limit_disable (Config ) ->
3288+ check_quorum_queues_v4_compat (Config ),
3289+
32713290 [Server | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
32723291
32733292 Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
@@ -3664,6 +3683,8 @@ queue_length_limit_reject_publish(Config) ->
36643683 ok .
36653684
36663685queue_length_limit_policy_cleared (Config ) ->
3686+ check_quorum_queues_v4_compat (Config ),
3687+
36673688 [Server | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
36683689
36693690 Ch = rabbit_ct_client_helpers :open_channel (Config , Server ),
@@ -4723,6 +4744,8 @@ select_nodes_with_least_replicas_node_down(Config) ->
47234744 || Q <- Qs ].
47244745
47254746requeue_multiple_true (Config ) ->
4747+ check_quorum_queues_v4_compat (Config ),
4748+
47264749 Ch = rabbit_ct_client_helpers :open_channel (Config ),
47274750 QQ = ? config (queue_name , Config ),
47284751 ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
@@ -4761,6 +4784,8 @@ requeue_multiple_true(Config) ->
47614784 amqp_channel :call (Ch , # 'queue.delete' {queue = QQ })).
47624785
47634786requeue_multiple_false (Config ) ->
4787+ check_quorum_queues_v4_compat (Config ),
4788+
47644789 Ch = rabbit_ct_client_helpers :open_channel (Config ),
47654790 QQ = ? config (queue_name , Config ),
47664791 ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
0 commit comments