diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 5a43ad3610..1d32b1aab6 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -50,6 +50,7 @@ import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS; import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH; @@ -66,6 +67,14 @@ public class JobVertexScaler> { protected static final String INEFFECTIVE_MESSAGE_FORMAT = "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s"; + @VisibleForTesting protected static final String SCALING_LIMITED = "ScalingLimited"; + + @VisibleForTesting + protected static final String SCALE_LIMITED_MESSAGE_FORMAT = + "Scaling limited detected for %s (expected parallelism: %s, actual parallelism %s). " + + "Scaling limited due to numKeyGroupsOrPartitions : %s," + + "upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, parallelismLowerLimit: %s."; + private Clock clock = Clock.system(ZoneId.systemDefault()); private final AutoScalerEventHandler autoScalerEventHandler; @@ -193,12 +202,16 @@ public ParallelismChange computeScaleTargetParallelism( int newParallelism = scale( + vertex, currentParallelism, inputShipStrategies, + (int) evaluatedMetrics.get(NUM_SOURCE_PARTITIONS).getCurrent(), (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(), scaleFactor, Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)), - Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM))); + Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)), + autoScalerEventHandler, + context); if (newParallelism == currentParallelism) { // Clear delayed scale down request if the new parallelism is equal to @@ -345,15 +358,22 @@ private boolean detectIneffectiveScaleUp( *

Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the * parallelism for source and keyed vertex such that it divides the maxParallelism without a * remainder. + * + *

This method also attempts to adjust the parallelism to ensure it aligns well with the + * number of source partitions if a vertex has a known source partition count. */ @VisibleForTesting - protected static int scale( + protected static > int scale( + JobVertexID vertex, int currentParallelism, Collection inputShipStrategies, + int numSourcePartitions, int maxParallelism, double scaleFactor, int parallelismLowerLimit, - int parallelismUpperLimit) { + int parallelismUpperLimit, + AutoScalerEventHandler eventHandler, + Context context) { checkArgument( parallelismLowerLimit <= parallelismUpperLimit, "The parallelism lower limitation must not be greater than the parallelism upper limitation."); @@ -383,23 +403,62 @@ protected static int scale( // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); - var adjustByMaxParallelism = - inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); - if (!adjustByMaxParallelism) { + var adjustByMaxParallelismOrPartitions = + numSourcePartitions > 0 || inputShipStrategies.contains(HASH); + if (!adjustByMaxParallelismOrPartitions) { return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + var numKeyGroupsOrPartitions = + numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions; + var upperBoundForAlignment = + Math.min( + // Optimize the case where newParallelism <= maxParallelism / 2 + newParallelism > numKeyGroupsOrPartitions / 2 + ? numKeyGroupsOrPartitions + : numKeyGroupsOrPartitions / 2, + upperBound); + + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides + // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= upperBoundForAlignment; p++) { + if (numKeyGroupsOrPartitions % p == 0) { return p; } } - // If parallelism adjustment fails, use originally computed parallelism - return newParallelism; + // When adjust the parallelism after rounding up cannot be evenly divided by + // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the + // current consumption rate. + int p = newParallelism; + for (; p > 0; p--) { + if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) { + if (numKeyGroupsOrPartitions % p != 0) { + p++; + } + break; + } + } + + p = Math.max(p, parallelismLowerLimit); + var message = + String.format( + SCALE_LIMITED_MESSAGE_FORMAT, + vertex, + newParallelism, + p, + numKeyGroupsOrPartitions, + upperBound, + parallelismLowerLimit); + eventHandler.handleEvent( + context, + AutoScalerEventHandler.Type.Warning, + SCALING_LIMITED, + message, + SCALING_LIMITED + vertex + (scaleFactor * currentParallelism), + context.getConfiguration().get(SCALING_EVENT_INTERVAL)); + return p; } @VisibleForTesting diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index ed7906c484..8bace9e15d 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -204,7 +204,7 @@ protected JobTopology getJobTopology( Set vertexSet = Set.copyOf(t.getVerticesInTopologicalOrder()); updateVertexList(stateStore, ctx, clock.instant(), vertexSet); - updateKafkaPulsarSourceMaxParallelisms(ctx, jobDetailsInfo.getJobId(), t); + updateKafkaPulsarSourceNumPartitions(ctx, jobDetailsInfo.getJobId(), t); excludeVerticesFromScaling(ctx.getConfiguration(), t.getFinishedVertices()); return t; } @@ -249,7 +249,7 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { json, slotSharingGroupIdMap, maxParallelismMap, metrics, finished); } - private void updateKafkaPulsarSourceMaxParallelisms( + private void updateKafkaPulsarSourceNumPartitions( Context ctx, JobID jobId, JobTopology topology) throws Exception { try (var restClient = ctx.getRestClusterClient()) { Pattern partitionRegex = @@ -284,7 +284,7 @@ private void updateKafkaPulsarSourceMaxParallelisms( "Updating source {} max parallelism based on available partitions to {}", sourceVertex, numPartitions); - topology.get(sourceVertex).updateMaxParallelism((int) numPartitions); + topology.get(sourceVertex).setNumSourcePartitions((int) numPartitions); } } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index 085d80ae62..5bbc09a3e2 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -55,6 +55,7 @@ import static org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED; import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; import static org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS; import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_TASK_SLOTS_USED; import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR; import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; @@ -166,6 +167,11 @@ private Map evaluateMetrics( evaluatedMetrics.put( MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism())); + + evaluatedMetrics.put( + NUM_SOURCE_PARTITIONS, + EvaluatedScalingMetric.of(vertexInfo.getNumSourcePartitions())); + computeProcessingRateThresholds(evaluatedMetrics, conf, processingBacklog, restartTime); return evaluatedMetrics; } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java index 5466f691e7..2b98bdcc23 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java @@ -53,6 +53,10 @@ public enum ScalingMetric { /** Job vertex max parallelism. */ MAX_PARALLELISM(false), + + /** Source vertex partition count. */ + NUM_SOURCE_PARTITIONS(false), + /** Upper boundary of the target data rate range. */ SCALE_UP_RATE_THRESHOLD(false), @@ -101,6 +105,7 @@ public enum ScalingMetric { PARALLELISM, RECOMMENDED_PARALLELISM, MAX_PARALLELISM, + NUM_SOURCE_PARTITIONS, SCALE_UP_RATE_THRESHOLD, SCALE_DOWN_RATE_THRESHOLD, EXPECTED_PROCESSING_RATE); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java index 705dd4c42a..6016d70ecc 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java @@ -46,7 +46,7 @@ public class VertexInfo { @Setter(AccessLevel.NONE) private int maxParallelism; - private final int originalMaxParallelism; + @Setter private int numSourcePartitions; private final boolean finished; @@ -65,7 +65,6 @@ public VertexInfo( this.inputs = inputs; this.parallelism = parallelism; this.maxParallelism = maxParallelism; - this.originalMaxParallelism = maxParallelism; this.finished = finished; this.ioMetrics = ioMetrics; } @@ -99,8 +98,4 @@ public VertexInfo( int maxParallelism) { this(id, inputs, parallelism, maxParallelism, null); } - - public void updateMaxParallelism(int maxParallelism) { - this.maxParallelism = Math.min(originalMaxParallelism, maxParallelism); - } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 17ade6c8fa..ae832073e2 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -47,6 +47,8 @@ import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_MESSAGE_FORMAT; import static org.apache.flink.autoscaler.JobVertexScaler.INEFFECTIVE_SCALING; +import static org.apache.flink.autoscaler.JobVertexScaler.SCALE_LIMITED_MESSAGE_FORMAT; +import static org.apache.flink.autoscaler.JobVertexScaler.SCALING_LIMITED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -68,7 +70,6 @@ public class JobVertexScalerTest { private static List> adjustmentInputsProvider() { return List.of( - List.of(), List.of(ShipStrategy.HASH), List.of(ShipStrategy.REBALANCE, ShipStrategy.HASH, ShipStrategy.RESCALE)); } @@ -237,35 +238,85 @@ public void testParallelismScaling(Collection inputShipStrategies) public void testParallelismComputation() { final int minParallelism = 1; final int maxParallelism = Integer.MAX_VALUE; + final var vertex = new JobVertexID(); assertEquals( 1, JobVertexScaler.scale( - 1, NOT_ADJUST_INPUTS, 720, 0.0001, minParallelism, maxParallelism)); + vertex, + 1, + NOT_ADJUST_INPUTS, + 0, + 720, + 0.0001, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 1, JobVertexScaler.scale( - 2, NOT_ADJUST_INPUTS, 720, 0.1, minParallelism, maxParallelism)); + vertex, + 2, + NOT_ADJUST_INPUTS, + 0, + 720, + 0.1, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 5, JobVertexScaler.scale( - 6, NOT_ADJUST_INPUTS, 720, 0.8, minParallelism, maxParallelism)); + vertex, + 6, + NOT_ADJUST_INPUTS, + 0, + 720, + 0.8, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 24, JobVertexScaler.scale( - 16, NOT_ADJUST_INPUTS, 128, 1.5, minParallelism, maxParallelism)); + vertex, + 16, + NOT_ADJUST_INPUTS, + 0, + 128, + 1.5, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 400, JobVertexScaler.scale( - 200, NOT_ADJUST_INPUTS, 720, 2, minParallelism, maxParallelism)); + vertex, + 200, + NOT_ADJUST_INPUTS, + 0, + 720, + 2, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 720, JobVertexScaler.scale( + vertex, 200, NOT_ADJUST_INPUTS, + 0, 720, Integer.MAX_VALUE, minParallelism, - maxParallelism)); + maxParallelism, + eventCollector, + context)); } @ParameterizedTest @@ -274,45 +325,146 @@ public void testParallelismComputationWithAdjustment( Collection inputShipStrategies) { final int minParallelism = 1; final int maxParallelism = Integer.MAX_VALUE; + final var vertex = new JobVertexID(); + assertEquals( 6, JobVertexScaler.scale( - 6, inputShipStrategies, 36, 0.8, minParallelism, maxParallelism)); + vertex, + 6, + inputShipStrategies, + 0, + 36, + 0.8, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 32, JobVertexScaler.scale( - 16, inputShipStrategies, 128, 1.5, minParallelism, maxParallelism)); + vertex, + 16, + inputShipStrategies, + 0, + 128, + 1.5, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 360, JobVertexScaler.scale( - 200, inputShipStrategies, 720, 1.3, minParallelism, maxParallelism)); + vertex, + 200, + inputShipStrategies, + 0, + 720, + 1.3, + minParallelism, + maxParallelism, + eventCollector, + context)); assertEquals( 720, JobVertexScaler.scale( + vertex, 200, inputShipStrategies, + 0, 720, Integer.MAX_VALUE, minParallelism, - maxParallelism)); + maxParallelism, + eventCollector, + context)); } @ParameterizedTest @MethodSource("adjustmentInputsProvider") public void testParallelismComputationWithLimit(Collection inputShipStrategies) { - assertEquals(5, JobVertexScaler.scale(6, inputShipStrategies, 720, 0.8, 1, 700)); - assertEquals(8, JobVertexScaler.scale(8, inputShipStrategies, 720, 0.8, 8, 700)); + final var vertex = new JobVertexID(); assertEquals( - 32, JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 1, Integer.MAX_VALUE)); + 5, + JobVertexScaler.scale( + vertex, + 6, + inputShipStrategies, + 0, + 720, + 0.8, + 1, + 700, + eventCollector, + context)); assertEquals( - 64, - JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 60, Integer.MAX_VALUE)); + 8, + JobVertexScaler.scale( + vertex, + 8, + inputShipStrategies, + 0, + 720, + 0.8, + 8, + 700, + eventCollector, + context)); - assertEquals(300, JobVertexScaler.scale(200, inputShipStrategies, 720, 2, 1, 300)); assertEquals( - 600, - JobVertexScaler.scale(200, inputShipStrategies, 720, Integer.MAX_VALUE, 1, 600)); + 32, + JobVertexScaler.scale( + vertex, + 16, + inputShipStrategies, + 0, + 128, + 1.5, + 1, + Integer.MAX_VALUE, + eventCollector, + context)); + assertEquals( + 64, + JobVertexScaler.scale( + vertex, + 16, + inputShipStrategies, + 0, + 128, + 1.5, + 60, + Integer.MAX_VALUE, + eventCollector, + context)); + assertEquals( + 240, + JobVertexScaler.scale( + vertex, + 200, + inputShipStrategies, + 0, + 720, + 2, + 1, + 300, + eventCollector, + context)); + assertEquals( + 360, + JobVertexScaler.scale( + vertex, + 200, + inputShipStrategies, + 0, + 720, + Integer.MAX_VALUE, + 1, + 600, + eventCollector, + context)); } @Test @@ -323,12 +475,16 @@ public void ensureMinParallelismDoesNotExceedMax() { assertEquals( 600, JobVertexScaler.scale( + new JobVertexID(), 200, NOT_ADJUST_INPUTS, + 0, 720, Integer.MAX_VALUE, 500, - 499))); + 499, + eventCollector, + context))); } @Test @@ -775,11 +931,119 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING); } + @Test + public void testNumPartitionsAdjustment() { + final int parallelismLowerLimit = 1; + final int parallelismUpperLimit = Integer.MAX_VALUE; + final var vertex = new JobVertexID(); + + assertEquals( + 3, + JobVertexScaler.scale( + vertex, + 6, + List.of(), + 15, + 128, + 0.4, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + assertEquals( + 15, + JobVertexScaler.scale( + vertex, + 7, + List.of(), + 15, + 128, + 1.2, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + assertEquals( + 18, + JobVertexScaler.scale( + vertex, + 20, + List.of(), + 35, + 30, + 0.9, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + + assertEquals( + 20, + JobVertexScaler.scale( + vertex, + 22, + List.of(), + 35, + 30, + 1.1, + 20, + parallelismUpperLimit, + eventCollector, + context)); + + assertEquals( + 100, + JobVertexScaler.scale( + vertex, + 80, + List.of(), + 200, + 128, + 1.4, + parallelismLowerLimit, + parallelismUpperLimit, + eventCollector, + context)); + } + + @Test + public void testSendingScalingLimitedEvents() { + var jobVertexID = new JobVertexID(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO); + conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO); + var evaluated = evaluated(10, 200, 100); + evaluated.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15)); + var history = new TreeMap(); + var delayedScaleDown = new DelayedScaleDown(); + // partition limited + assertEquals( + ParallelismChange.required(15), + vertexScaler.computeScaleTargetParallelism( + context, + jobVertexID, + List.of(), + evaluated, + history, + restartTime, + delayedScaleDown)); + assertEquals(1, eventCollector.events.size()); + TestingEventCollector.Event> partitionLimitedEvent = + eventCollector.events.poll(); + assertThat(partitionLimitedEvent).isNotNull(); + assertEquals(SCALING_LIMITED, partitionLimitedEvent.getReason()); + assertThat(partitionLimitedEvent.getMessage()) + .isEqualTo( + String.format( + SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1)); + } + private Map evaluated( int parallelism, double targetDataRate, double trueProcessingRate) { var metrics = new HashMap(); metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism)); metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720)); + metrics.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(0)); metrics.put( ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(targetDataRate, targetDataRate)); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index 1d3d9bffaf..102586a3ef 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -233,7 +233,7 @@ private void setDefaultMetrics( } @Test - public void testKafkaPulsarPartitionMaxParallelism() throws Exception { + public void testKafkaPulsarNumPartitions() throws Exception { setDefaultMetrics(metricsCollector); metricsCollector.updateMetrics(context, stateStore); @@ -241,13 +241,8 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception { metricsCollector.setClock(clock); var collectedMetrics = metricsCollector.updateMetrics(context, stateStore); - - assertEquals(720, collectedMetrics.getJobTopology().get(source1).getMaxParallelism()); - assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism()); - clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault()); metricsCollector.setClock(clock); - metricsCollector.setMetricNames( Map.of( source1, @@ -260,8 +255,7 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception { "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset"))); collectedMetrics = metricsCollector.updateMetrics(context, stateStore); - assertEquals(5, collectedMetrics.getJobTopology().get(source1).getMaxParallelism()); - assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism()); + assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumSourcePartitions()); metricsCollector.setMetricNames( Map.of( @@ -280,7 +274,7 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception { "0.Source__pulsar_source[1].PulsarConsumer" + ".persistent_//public/default/testTopic-partition-4.m962n.numMsgsReceived"))); collectedMetrics = metricsCollector.updateMetrics(context, stateStore); - assertEquals(5, collectedMetrics.getJobTopology().get(source2).getMaxParallelism()); + assertEquals(5, collectedMetrics.getJobTopology().get(source2).getNumSourcePartitions()); } @Test diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 50e5870ac3..3eeac97b06 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -475,9 +475,9 @@ public void testMemoryTuning(boolean memoryTuningEnabled) throws Exception { TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.652", TaskManagerOptions.NETWORK_MEMORY_MIN.key(), - "24320 kb", + "23040 kb", TaskManagerOptions.NETWORK_MEMORY_MAX.key(), - "24320 kb", + "23040 kb", TaskManagerOptions.JVM_METASPACE.key(), "360 mb", TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(), @@ -485,7 +485,7 @@ public void testMemoryTuning(boolean memoryTuningEnabled) throws Exception { TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(), "0 bytes", TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), - "20400832696 bytes"); + "20399521976 bytes"); } else { assertEquals(context.getConfiguration(), capturedConfForMaxResources); expected = Map.of(); @@ -752,7 +752,7 @@ public void testAdjustByMaxParallelism() throws Exception { .containsAllEntriesOf( Map.of( "0bfd135746ac8efb3cce668b12e16d3a", - "8", + "7", "869fb403873411306404e9f2e4438c0e", "7", "a6b7102b8d3e3a9564998c1ffeb5e2b7", @@ -880,6 +880,7 @@ private Map evaluated( var metrics = new HashMap(); metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism)); metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(MAX_PARALLELISM)); + metrics.put(ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(0)); metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target)); metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate)); metrics.put(