diff --git a/examples/autoscaling/autoscaling-dynamic.yaml b/examples/autoscaling/autoscaling-dynamic.yaml index 85f5eb0d9d..0e6466a4c5 100644 --- a/examples/autoscaling/autoscaling-dynamic.yaml +++ b/examples/autoscaling/autoscaling-dynamic.yaml @@ -27,7 +27,7 @@ spec: job.autoscaler.enabled: "true" job.autoscaler.stabilization.interval: "1m" job.autoscaler.metrics.window: "15m" - job.autoscaler.target.utilization: "0.5" + job.autoscaler.utilization.target: "0.5" job.autoscaler.target.utilization.boundary: "0.3" pipeline.max-parallelism: "32" taskmanager.numberOfTaskSlots: "4" diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index 8bace9e15d..ee3ded9e0b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -149,24 +149,39 @@ public CollectedMetricHistory updateMetrics( // Add scaling metrics to history if they were computed successfully metricHistory.put(now, scalingMetrics); - if (isStabilizing) { - LOG.info("Stabilizing until {}", readable(stableTime)); - stateStore.storeCollectedMetrics(ctx, metricHistory); - return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs); - } - var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs); if (now.isBefore(windowFullTime)) { - LOG.info("Metric window not full until {}", readable(windowFullTime)); + if (isStabilizing) { + LOG.info("Stabilizing until {}", readable(stableTime)); + } else { + LOG.info( + "Metric window is not full until {}. {} samples collected so far", + readable(windowFullTime), + metricHistory.size()); + } } else { collectedMetrics.setFullyCollected(true); // Trim metrics outside the metric window from metrics history - metricHistory.headMap(now.minus(metricWindowSize)).clear(); + var trimBefore = now.minus(metricWindowSize); + int numDropped = removeMetricsBefore(trimBefore, metricHistory); + LOG.debug( + "Metric window is now full. Dropped {} samples before {}, keeping {}.", + numDropped, + readable(trimBefore), + metricHistory.size()); } stateStore.storeCollectedMetrics(ctx, metricHistory); return collectedMetrics; } + private int removeMetricsBefore( + Instant cutOffTimestamp, SortedMap metricHistory) { + var toBeDropped = metricHistory.headMap(cutOffTimestamp); + var numDropped = toBeDropped.size(); + toBeDropped.clear(); + return numDropped; + } + protected abstract Map queryJmMetrics(Context ctx) throws Exception; protected abstract Map queryTmMetrics(Context ctx) diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index ad2f5c7225..1176075f9c 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -129,13 +129,13 @@ public void testEndToEnd() throws Exception { setDefaultMetrics(metricsCollector); - // We haven't left the stabilization period - // => no metrics reporting and collection should take place + // We haven't left the stabilization period, but we're collecting and returning the metrics + // for reporting var collectedMetrics = metricsCollector.updateMetrics(context, stateStore); - assertTrue(collectedMetrics.getMetricHistory().isEmpty()); + assertEquals(1, collectedMetrics.getMetricHistory().size()); - // We haven't collected a full window yet, no metrics should be reported but metrics should - // still get collected. + // We haven't collected a full window yet, but we're collecting and returning the metrics + // for reporting clock = Clock.offset(clock, conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); metricsCollector.setClock(clock); collectedMetrics = metricsCollector.updateMetrics(context, stateStore); @@ -294,12 +294,12 @@ public void testJobDetailsRestCompatibility() throws JsonProcessingException { public void testMetricCollectorWindow() throws Exception { setDefaultMetrics(metricsCollector); var metricsHistory = metricsCollector.updateMetrics(context, stateStore); - assertEquals(0, metricsHistory.getMetricHistory().size()); + assertEquals(1, metricsHistory.getMetricHistory().size()); - // Not stable, nothing should be collected + // Not stable, metrics collected and reported metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1))); metricsHistory = metricsCollector.updateMetrics(context, stateStore); - assertEquals(0, metricsHistory.getMetricHistory().size()); + assertEquals(2, metricsHistory.getMetricHistory().size()); // Update clock to stable time var conf = context.getConfiguration(); @@ -336,10 +336,10 @@ public void testMetricCollectorWindow() throws Exception { metricsHistory = metricsCollector.updateMetrics(context, stateStore); assertEquals(1, metricsHistory.getMetricHistory().size()); - // Existing metrics should be cleared on job updates + // Existing metrics should be cleared on job updates, and start collecting from fresh metricsCollector.setJobUpdateTs(clock.instant().plus(Duration.ofDays(10))); metricsHistory = metricsCollector.updateMetrics(context, stateStore); - assertEquals(0, metricsHistory.getMetricHistory().size()); + assertEquals(1, metricsHistory.getMetricHistory().size()); } @Test @@ -351,10 +351,11 @@ public void testClearHistoryOnTopoChange() throws Exception { setDefaultMetrics(metricsCollector); - // We haven't left the stabilization period - // => no metrics reporting and collection should take place + // We haven't left the stabilization period, we're returning the metrics + // but don't act on it since the metric window is not full yet var collectedMetrics = metricsCollector.updateMetrics(context, stateStore); - assertTrue(collectedMetrics.getMetricHistory().isEmpty()); + assertFalse(collectedMetrics.getMetricHistory().isEmpty()); + assertFalse(collectedMetrics.isFullyCollected()); } @Test @@ -571,19 +572,18 @@ public void testMetricCollectionDuringStabilization() throws Exception { .set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMillis(100)); context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW, Duration.ofMillis(100)); - // Within stabilization period we simply collect metrics but do not return them + // Until window is full (time=200) we keep returning stabilizing metrics metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), ZoneId.systemDefault())); - assertTrue( - metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty()); + assertEquals( + 1, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size()); assertEquals(1, stateStore.getCollectedMetrics(context).size()); metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), ZoneId.systemDefault())); - assertTrue( - metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty()); + assertEquals( + 2, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size()); assertEquals(2, stateStore.getCollectedMetrics(context).size()); testTolerateMetricsMissingDuringStabilizationPhase(topology); - // Until window is full (time=200) we keep returning stabilizing metrics metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), ZoneId.systemDefault())); assertEquals( 3, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());