Skip to content

Commit 214eb82

Browse files
author
huyuanfeng
committed
fix test case
1 parent 52e0e8a commit 214eb82

File tree

3 files changed

+5
-9
lines changed

3 files changed

+5
-9
lines changed

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public enum ScalingMetric {
104104
PARALLELISM,
105105
RECOMMENDED_PARALLELISM,
106106
MAX_PARALLELISM,
107+
NUM_PARTITIONS,
107108
SCALE_UP_RATE_THRESHOLD,
108109
SCALE_DOWN_RATE_THRESHOLD,
109110
EXPECTED_PROCESSING_RATE);

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,21 +233,16 @@ private void setDefaultMetrics(
233233
}
234234

235235
@Test
236-
public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
236+
public void testKafkaPulsarNumPartitions() throws Exception {
237237
setDefaultMetrics(metricsCollector);
238238
metricsCollector.updateMetrics(context, stateStore);
239239

240240
var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
241241
metricsCollector.setClock(clock);
242242

243243
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
244-
245-
assertEquals(720, collectedMetrics.getJobTopology().get(source1).getMaxParallelism());
246-
assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
247-
248244
clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), ZoneId.systemDefault());
249245
metricsCollector.setClock(clock);
250-
251246
metricsCollector.setMetricNames(
252247
Map.of(
253248
source1,
@@ -260,8 +255,7 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
260255
"1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));
261256

262257
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
263-
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getMaxParallelism());
264-
assertEquals(720, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
258+
assertEquals(5, collectedMetrics.getJobTopology().get(source1).getNumPartitions());
265259

266260
metricsCollector.setMetricNames(
267261
Map.of(
@@ -280,7 +274,7 @@ public void testKafkaPulsarPartitionMaxParallelism() throws Exception {
280274
"0.Source__pulsar_source[1].PulsarConsumer"
281275
+ ".persistent_//public/default/testTopic-partition-4.m962n.numMsgsReceived")));
282276
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
283-
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getMaxParallelism());
277+
assertEquals(5, collectedMetrics.getJobTopology().get(source2).getNumPartitions());
284278
}
285279

286280
@Test

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
880880
var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
881881
metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism));
882882
metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(MAX_PARALLELISM));
883+
metrics.put(ScalingMetric.NUM_PARTITIONS, EvaluatedScalingMetric.of(0));
883884
metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target));
884885
metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchupRate));
885886
metrics.put(

0 commit comments

Comments
 (0)