@@ -92,7 +92,15 @@ groups() ->
9292 leader_locator_policy ,
9393 status ,
9494 format ,
95+ <<<<<<< HEAD
9596 add_member_2
97+ =======
98+ add_member_2 ,
99+ single_active_consumer_priority_take_over ,
100+ single_active_consumer_priority ,
101+ force_shrink_member_to_current_member ,
102+ force_all_queues_shrink_member_to_current_member
103+ >>>>>>> 10 dbde1f71 (QQ tests for force - shrink to current member operations )
96104 ]
97105 ++ all_tests ()},
98106 {cluster_size_5 , [], [start_queue ,
@@ -992,6 +1000,267 @@ consume_in_minority(Config) ->
9921000 rabbit_quorum_queue :restart_server ({RaName , Server2 }),
9931001 ok .
9941002
1003+ <<<<<<< HEAD
1004+ =======
1005+ single_active_consumer_priority_take_over (Config ) ->
1006+ check_quorum_queues_v4_compat (Config ),
1007+
1008+ [Server0 , Server1 , _Server2 ] =
1009+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1010+ Ch1 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1011+ Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
1012+ QName = ? config (queue_name , Config ),
1013+ Q1 = <<QName /binary , " _1" >>,
1014+ RaNameQ1 = binary_to_atom (<<" %2F" , " _" , Q1 /binary >>, utf8 ),
1015+ QueryFun = fun rabbit_fifo :query_single_active_consumer /1 ,
1016+ Args = [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
1017+ {<<" x-single-active-consumer" >>, bool , true }],
1018+ ? assertEqual ({'queue.declare_ok' , Q1 , 0 , 0 }, declare (Ch1 , Q1 , Args )),
1019+ ok = subscribe (Ch1 , Q1 , false , <<" ch1-ctag1" >>, [{" x-priority" , byte , 1 }]),
1020+ ? assertMatch ({ok , {_ , {value , {<<" ch1-ctag1" >>, _ }}}, _ },
1021+ rpc :call (Server0 , ra , local_query , [RaNameQ1 , QueryFun ])),
1022+ # 'confirm.select_ok' {} = amqp_channel :call (Ch2 , # 'confirm.select' {}),
1023+ publish_confirm (Ch2 , Q1 ),
1024+ % % higher priority consumer attaches
1025+ ok = subscribe (Ch2 , Q1 , false , <<" ch2-ctag1" >>, [{" x-priority" , byte , 3 }]),
1026+
1027+ % % Q1 should still have Ch1 as consumer as it has pending messages
1028+ ? assertMatch ({ok , {_ , {value , {<<" ch1-ctag1" >>, _ }}}, _ },
1029+ rpc :call (Server0 , ra , local_query ,
1030+ [RaNameQ1 , QueryFun ])),
1031+
1032+ % % ack the message
1033+ receive
1034+ {# 'basic.deliver' {consumer_tag = <<" ch1-ctag1" >>,
1035+ delivery_tag = DeliveryTag }, _ } ->
1036+ amqp_channel :cast (Ch1 , # 'basic.ack' {delivery_tag = DeliveryTag ,
1037+ multiple = false })
1038+ after 5000 ->
1039+ flush (1 ),
1040+ exit (basic_deliver_timeout )
1041+ end ,
1042+
1043+ ? awaitMatch ({ok , {_ , {value , {<<" ch2-ctag1" >>, _ }}}, _ },
1044+ rpc :call (Server0 , ra , local_query , [RaNameQ1 , QueryFun ]),
1045+ ? DEFAULT_AWAIT ),
1046+ ok .
1047+
1048+ single_active_consumer_priority (Config ) ->
1049+ check_quorum_queues_v4_compat (Config ),
1050+ [Server0 , Server1 , Server2 ] =
1051+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1052+
1053+ Ch1 = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1054+ Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server1 ),
1055+ Ch3 = rabbit_ct_client_helpers :open_channel (Config , Server2 ),
1056+ QName = ? config (queue_name , Config ),
1057+ Q1 = <<QName /binary , " _1" >>,
1058+ Q2 = <<QName /binary , " _2" >>,
1059+ Q3 = <<QName /binary , " _3" >>,
1060+ Args = [{<<" x-queue-type" >>, longstr , <<" quorum" >>},
1061+ {<<" x-single-active-consumer" >>, bool , true }],
1062+ ? assertEqual ({'queue.declare_ok' , Q1 , 0 , 0 }, declare (Ch1 , Q1 , Args )),
1063+ ? assertEqual ({'queue.declare_ok' , Q2 , 0 , 0 }, declare (Ch2 , Q2 , Args )),
1064+ ? assertEqual ({'queue.declare_ok' , Q3 , 0 , 0 }, declare (Ch3 , Q3 , Args )),
1065+
1066+ ok = subscribe (Ch1 , Q1 , false , <<" ch1-ctag1" >>, [{" x-priority" , byte , 3 }]),
1067+ ok = subscribe (Ch1 , Q2 , false , <<" ch1-ctag2" >>, [{" x-priority" , byte , 2 }]),
1068+ ok = subscribe (Ch1 , Q3 , false , <<" ch1-ctag3" >>, [{" x-priority" , byte , 1 }]),
1069+
1070+
1071+ ok = subscribe (Ch2 , Q1 , false , <<" ch2-ctag1" >>, [{" x-priority" , byte , 1 }]),
1072+ ok = subscribe (Ch2 , Q2 , false , <<" ch2-ctag2" >>, [{" x-priority" , byte , 3 }]),
1073+ ok = subscribe (Ch2 , Q3 , false , <<" ch2-ctag3" >>, [{" x-priority" , byte , 2 }]),
1074+
1075+ ok = subscribe (Ch3 , Q1 , false , <<" ch3-ctag1" >>, [{" x-priority" , byte , 2 }]),
1076+ ok = subscribe (Ch3 , Q2 , false , <<" ch3-ctag2" >>, [{" x-priority" , byte , 1 }]),
1077+ ok = subscribe (Ch3 , Q3 , false , <<" ch3-ctag3" >>, [{" x-priority" , byte , 3 }]),
1078+
1079+
1080+ RaNameQ1 = binary_to_atom (<<" %2F" , " _" , Q1 /binary >>, utf8 ),
1081+ RaNameQ2 = binary_to_atom (<<" %2F" , " _" , Q2 /binary >>, utf8 ),
1082+ RaNameQ3 = binary_to_atom (<<" %2F" , " _" , Q3 /binary >>, utf8 ),
1083+ % % assert each queue has a different consumer
1084+ QueryFun = fun rabbit_fifo :query_single_active_consumer /1 ,
1085+
1086+ % % Q1 should have the consumer on Ch1
1087+ ? assertMatch ({ok , {_ , {value , {<<" ch1-ctag1" >>, _ }}}, _ },
1088+ rpc :call (Server0 , ra , local_query , [RaNameQ1 , QueryFun ])),
1089+
1090+ % % Q2 Ch2
1091+ ? assertMatch ({ok , {_ , {value , {<<" ch2-ctag2" >>, _ }}}, _ },
1092+ rpc :call (Server1 , ra , local_query , [RaNameQ2 , QueryFun ])),
1093+
1094+ % % Q3 Ch3
1095+ ? assertMatch ({ok , {_ , {value , {<<" ch3-ctag3" >>, _ }}}, _ },
1096+ rpc :call (Server2 , ra , local_query , [RaNameQ3 , QueryFun ])),
1097+
1098+ % % close Ch3
1099+ _ = rabbit_ct_client_helpers :close_channel (Ch3 ),
1100+ flush (100 ),
1101+
1102+ % % assert Q3 has Ch2 (priority 2) as consumer
1103+ ? assertMatch ({ok , {_ , {value , {<<" ch2-ctag3" >>, _ }}}, _ },
1104+ rpc :call (Server2 , ra , local_query , [RaNameQ3 , QueryFun ])),
1105+
1106+ % % close Ch2
1107+ _ = rabbit_ct_client_helpers :close_channel (Ch2 ),
1108+ flush (100 ),
1109+
1110+ % % assert all queues as has Ch1 as consumer
1111+ ? assertMatch ({ok , {_ , {value , {<<" ch1-ctag1" >>, _ }}}, _ },
1112+ rpc :call (Server0 , ra , local_query , [RaNameQ1 , QueryFun ])),
1113+ ? assertMatch ({ok , {_ , {value , {<<" ch1-ctag2" >>, _ }}}, _ },
1114+ rpc :call (Server0 , ra , local_query , [RaNameQ2 , QueryFun ])),
1115+ ? assertMatch ({ok , {_ , {value , {<<" ch1-ctag3" >>, _ }}}, _ },
1116+ rpc :call (Server0 , ra , local_query , [RaNameQ3 , QueryFun ])),
1117+ ok .
1118+
1119+ force_shrink_member_to_current_member (Config ) ->
1120+ [Server0 , Server1 , Server2 ] =
1121+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1122+
1123+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1124+ QQ = ? config (queue_name , Config ),
1125+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1126+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1127+
1128+ RaName = ra_name (QQ ),
1129+ rabbit_ct_client_helpers :publish (Ch , QQ , 3 ),
1130+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1131+
1132+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1133+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1134+ ? assertEqual (3 , length (Nodes0 )),
1135+
1136+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1137+ force_shrink_member_to_current_member , [<<" /" >>, QQ ]),
1138+
1139+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1140+
1141+ {ok , Q1 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1142+ #{nodes := Nodes1 } = amqqueue :get_type_state (Q1 ),
1143+ ? assertEqual (1 , length (Nodes1 )),
1144+
1145+ % % grow queues back to all nodes
1146+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1147+
1148+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1149+ {ok , Q2 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [QQ , <<" /" >>]),
1150+ #{nodes := Nodes2 } = amqqueue :get_type_state (Q2 ),
1151+ ? assertEqual (3 , length (Nodes2 )).
1152+
1153+ force_all_queues_shrink_member_to_current_member (Config ) ->
1154+ [Server0 , Server1 , Server2 ] =
1155+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1156+
1157+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1158+ QQ = ? config (queue_name , Config ),
1159+ AQ = ? config (alt_queue_name , Config ),
1160+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1161+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1162+ ? assertEqual ({'queue.declare_ok' , AQ , 0 , 0 },
1163+ declare (Ch , AQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1164+
1165+ QQs = [QQ , AQ ],
1166+
1167+ [begin
1168+ RaName = ra_name (Q ),
1169+ rabbit_ct_client_helpers :publish (Ch , Q , 3 ),
1170+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1171+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1172+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1173+ ? assertEqual (3 , length (Nodes0 ))
1174+ end || Q <- QQs ],
1175+
1176+ rabbit_ct_broker_helpers :rpc (Config , 0 , rabbit_quorum_queue ,
1177+ force_all_queues_shrink_member_to_current_member , []),
1178+
1179+ [begin
1180+ RaName = ra_name (Q ),
1181+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1182+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1183+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1184+ ? assertEqual (1 , length (Nodes0 ))
1185+ end || Q <- QQs ],
1186+
1187+ % % grow queues back to all nodes
1188+ [rpc :call (Server0 , rabbit_quorum_queue , grow , [S , <<" /" >>, <<" .*" >>, all ]) || S <- [Server1 , Server2 ]],
1189+
1190+ [begin
1191+ RaName = ra_name (Q ),
1192+ wait_for_messages_ready ([Server0 ], RaName , 3 ),
1193+ {ok , Q0 } = rpc :call (Server0 , rabbit_amqqueue , lookup , [Q , <<" /" >>]),
1194+ #{nodes := Nodes0 } = amqqueue :get_type_state (Q0 ),
1195+ ? assertEqual (3 , length (Nodes0 ))
1196+ end || Q <- QQs ].
1197+
1198+ priority_queue_fifo (Config ) ->
1199+ % % testing: if hi priority messages are published before lo priority
1200+ % % messages they are always consumed first (fifo)
1201+ check_quorum_queues_v4_compat (Config ),
1202+ [Server0 | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1203+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1204+ Queue = ? config (queue_name , Config ),
1205+ ? assertEqual ({'queue.declare_ok' , Queue , 0 , 0 },
1206+ declare (Ch , Queue ,
1207+ [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1208+ ExpectedHi =
1209+ [begin
1210+ MsgP5 = integer_to_binary (P ),
1211+ ok = amqp_channel :cast (Ch , # 'basic.publish' {routing_key = Queue },
1212+ # amqp_msg {props = # 'P_basic' {priority = P },
1213+ payload = MsgP5 }),
1214+ MsgP5
1215+ % % high priority is > 4
1216+ end || P <- lists :seq (5 , 10 )],
1217+
1218+ ExpectedLo =
1219+ [begin
1220+ MsgP1 = integer_to_binary (P ),
1221+ ok = amqp_channel :cast (Ch , # 'basic.publish' {routing_key = Queue },
1222+ # amqp_msg {props = # 'P_basic' {priority = P },
1223+ payload = MsgP1 }),
1224+ MsgP1
1225+ end || P <- lists :seq (0 , 4 )],
1226+
1227+ validate_queue (Ch , Queue , ExpectedHi ++ ExpectedLo ),
1228+ ok .
1229+
1230+ priority_queue_2_1_ratio (Config ) ->
1231+ % % testing: if lo priority messages are published before hi priority
1232+ % % messages are consumed in a 2:1 hi to lo ratio
1233+ check_quorum_queues_v4_compat (Config ),
1234+ [Server0 | _ ] = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1235+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1236+ Queue = ? config (queue_name , Config ),
1237+ ? assertEqual ({'queue.declare_ok' , Queue , 0 , 0 },
1238+ declare (Ch , Queue ,
1239+ [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1240+ ExpectedLo =
1241+ [begin
1242+ MsgP1 = integer_to_binary (P ),
1243+ ok = amqp_channel :cast (Ch , # 'basic.publish' {routing_key = Queue },
1244+ # amqp_msg {props = # 'P_basic' {priority = P },
1245+ payload = MsgP1 }),
1246+ MsgP1
1247+ end || P <- lists :seq (0 , 4 )],
1248+ ExpectedHi =
1249+ [begin
1250+ MsgP5 = integer_to_binary (P ),
1251+ ok = amqp_channel :cast (Ch , # 'basic.publish' {routing_key = Queue },
1252+ # amqp_msg {props = # 'P_basic' {priority = P },
1253+ payload = MsgP5 }),
1254+ MsgP5
1255+ % % high priority is > 4
1256+ end || P <- lists :seq (5 , 14 )],
1257+
1258+ Expected = lists_interleave (ExpectedLo , ExpectedHi ),
1259+
1260+ validate_queue (Ch , Queue , Expected ),
1261+ ok .
1262+
1263+ >>>>>>> 10 dbde1f71 (QQ tests for force - shrink to current member operations )
9951264reject_after_leader_transfer (Config ) ->
9961265 [Server0 , Server1 , Server2 ] =
9971266 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
0 commit comments