@@ -70,7 +70,6 @@ public class JobVertexScalerTest {
7070
7171 private static List <Collection <ShipStrategy >> adjustmentInputsProvider () {
7272 return List .of (
73- List .of (),
7473 List .of (ShipStrategy .HASH ),
7574 List .of (ShipStrategy .REBALANCE , ShipStrategy .HASH , ShipStrategy .RESCALE ));
7675 }
@@ -936,9 +935,8 @@ public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> inputSh
936935 assertThat (event .getReason ()).isEqualTo (INEFFECTIVE_SCALING );
937936 }
938937
939- @ ParameterizedTest
940- @ MethodSource ("sourceInputsProvider" )
941- public void testNumPartitionsAdjustment (Collection <ShipStrategy > inputShipStrategies ) {
938+ @ Test
939+ public void testNumPartitionsAdjustment () {
942940 final int minParallelism = 1 ;
943941 final int maxParallelism = Integer .MAX_VALUE ;
944942 final var vertex = new JobVertexID ();
@@ -948,7 +946,7 @@ public void testNumPartitionsAdjustment(Collection<ShipStrategy> inputShipStrate
948946 JobVertexScaler .scale (
949947 vertex ,
950948 6 ,
951- inputShipStrategies ,
949+ List . of () ,
952950 15 ,
953951 128 ,
954952 0.4 ,
@@ -961,117 +959,59 @@ public void testNumPartitionsAdjustment(Collection<ShipStrategy> inputShipStrate
961959 JobVertexScaler .scale (
962960 vertex ,
963961 7 ,
964- inputShipStrategies ,
962+ List . of () ,
965963 15 ,
966964 128 ,
967965 0.8 ,
968966 minParallelism ,
969967 maxParallelism ,
970968 eventCollector ,
971969 context ));
972- assertEquals (
973- 11 ,
974- JobVertexScaler .scale (
975- vertex ,
976- 7 ,
977- inputShipStrategies ,
978- 21 ,
979- 20 ,
980- 2.1 ,
981- minParallelism ,
982- maxParallelism ,
983- eventCollector ,
984- context ));
985970 assertEquals (
986971 18 ,
987972 JobVertexScaler .scale (
988973 vertex ,
989- 24 ,
990- inputShipStrategies ,
974+ 20 ,
975+ List . of () ,
991976 35 ,
992- 24 ,
993- 0.8 ,
994- 18 ,
977+ 30 ,
978+ 0.9 ,
979+ minParallelism ,
995980 maxParallelism ,
996981 eventCollector ,
997982 context ));
998983
999984 assertEquals (
1000- 18 ,
985+ 20 ,
1001986 JobVertexScaler .scale (
1002987 vertex ,
1003- 24 ,
1004- inputShipStrategies ,
988+ 22 ,
989+ List . of () ,
1005990 35 ,
1006- 24 ,
1007- 0.8 ,
1008- 17 ,
991+ 30 ,
992+ 1.1 ,
993+ 20 ,
1009994 maxParallelism ,
1010995 eventCollector ,
1011996 context ));
1012997
1013- // numSourcePartition > upperBound
1014998 assertEquals (
1015999 100 ,
10161000 JobVertexScaler .scale (
10171001 vertex ,
10181002 80 ,
1019- inputShipStrategies ,
1003+ List . of () ,
10201004 200 ,
10211005 128 ,
10221006 1.4 ,
10231007 minParallelism ,
10241008 maxParallelism ,
10251009 eventCollector ,
10261010 context ));
1027-
1028- assertEquals (
1029- 20 ,
1030- JobVertexScaler .scale (
1031- vertex ,
1032- 15 ,
1033- inputShipStrategies ,
1034- 200 ,
1035- 128 ,
1036- 1.2 ,
1037- minParallelism ,
1038- maxParallelism ,
1039- eventCollector ,
1040- context ));
1041-
1042- // minParallelism limited
1043- assertEquals (
1044- 5 ,
1045- JobVertexScaler .scale (
1046- vertex ,
1047- 6 ,
1048- inputShipStrategies ,
1049- 15 ,
1050- 128 ,
1051- 0.5 ,
1052- 4 ,
1053- maxParallelism ,
1054- eventCollector ,
1055- context ));
1056-
1057- assertEquals (
1058- 16 ,
1059- JobVertexScaler .scale (
1060- vertex ,
1061- 6 ,
1062- inputShipStrategies ,
1063- 15 ,
1064- 128 ,
1065- 0.5 ,
1066- 16 ,
1067- maxParallelism ,
1068- eventCollector ,
1069- context ));
10701011 }
10711012
1072- @ ParameterizedTest
1073- @ MethodSource ("sourceInputsProvider" )
1074- public void testSendingScalingLimitedEvents (Collection <ShipStrategy > inputShipStrategies ) {
1013+ @ Test
1014+ public void testSendingScalingLimitedEvents () {
10751015 var jobVertexID = new JobVertexID ();
10761016 conf .set (AutoScalerOptions .TARGET_UTILIZATION , 1.0 );
10771017 conf .set (AutoScalerOptions .SCALE_DOWN_INTERVAL , Duration .ZERO );
@@ -1086,7 +1026,7 @@ public void testSendingScalingLimitedEvents(Collection<ShipStrategy> inputShipSt
10861026 vertexScaler .computeScaleTargetParallelism (
10871027 context ,
10881028 jobVertexID ,
1089- inputShipStrategies ,
1029+ List . of () ,
10901030 evaluated ,
10911031 history ,
10921032 restartTime ,
@@ -1099,14 +1039,7 @@ public void testSendingScalingLimitedEvents(Collection<ShipStrategy> inputShipSt
10991039 assertThat (partitionLimitedEvent .getMessage ())
11001040 .isEqualTo (
11011041 String .format (
1102- SCALE_LIMITED_MESSAGE_FORMAT ,
1103- jobVertexID ,
1104- 20 ,
1105- 15 ,
1106- String .format (
1107- "numPartitions : %s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, "
1108- + "parallelismLowerLimit: %s." ,
1109- 15 , 200 , 1 )));
1042+ SCALE_LIMITED_MESSAGE_FORMAT , jobVertexID , 20 , 15 , 15 , 200 , 1 ));
11101043 }
11111044
11121045 private Map <ScalingMetric , EvaluatedScalingMetric > evaluated (
0 commit comments