Skip to content

Commit 3f30475

Browse files
committed
[FLINK-36409] Publish some autoscaler metrics during stabilisation period
1 parent 44dd679 commit 3f30475

File tree

5 files changed

+43
-31
lines changed

5 files changed

+43
-31
lines changed

examples/autoscaling/autoscaling-dynamic.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
job.autoscaler.enabled: "true"
2828
job.autoscaler.stabilization.interval: "1m"
2929
job.autoscaler.metrics.window: "15m"
30-
job.autoscaler.target.utilization: "0.5"
30+
job.autoscaler.utilization.target: "0.5"
3131
job.autoscaler.target.utilization.boundary: "0.3"
3232
pipeline.max-parallelism: "32"
3333
taskmanager.numberOfTaskSlots: "4"

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,19 +149,31 @@ public CollectedMetricHistory updateMetrics(
149149
// Add scaling metrics to history if they were computed successfully
150150
metricHistory.put(now, scalingMetrics);
151151

152-
if (isStabilizing) {
153-
LOG.info("Stabilizing until {}", readable(stableTime));
154-
stateStore.storeCollectedMetrics(ctx, metricHistory);
155-
return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs);
156-
}
157-
158152
var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
159153
if (now.isBefore(windowFullTime)) {
160-
LOG.info("Metric window not full until {}", readable(windowFullTime));
154+
if (isStabilizing) {
155+
LOG.info(
156+
"Stabilizing... until {}. {} samples collected",
157+
readable(stableTime),
158+
metricHistory.size());
159+
} else {
160+
LOG.info(
161+
"Metric window is not full until {}. {} samples collected",
162+
readable(windowFullTime),
163+
metricHistory.size());
164+
}
161165
} else {
162166
collectedMetrics.setFullyCollected(true);
163167
// Trim metrics outside the metric window from metrics history
164-
metricHistory.headMap(now.minus(metricWindowSize)).clear();
168+
var trimBefore = now.minus(metricWindowSize);
169+
var head = metricHistory.headMap(trimBefore);
170+
var dropped = head.size();
171+
head.clear();
172+
LOG.debug(
173+
"Metric window is now full. Dropped {} samples before {}, keeping {}.",
174+
dropped,
175+
readable(trimBefore),
176+
metricHistory.size());
165177
}
166178
stateStore.storeCollectedMetrics(ctx, metricHistory);
167179
return collectedMetrics;

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/DateTimeUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
public class DateTimeUtils {
2727

2828
private static final DateTimeFormatter DEFAULT_FORMATTER =
29-
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
29+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
3030

3131
/**
3232
* Convert an Instant to a readable format for the system default zone.

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ public void testEndToEnd() throws Exception {
129129

130130
setDefaultMetrics(metricsCollector);
131131

132-
// We haven't left the stabilization period
133-
// => no metrics reporting and collection should take place
132+
// We haven't left the stabilization period, but we're collecting and returning the metrics
133+
// for reporting
134134
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
135-
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
135+
assertEquals(1, collectedMetrics.getMetricHistory().size());
136136

137-
// We haven't collected a full window yet, no metrics should be reported but metrics should
138-
// still get collected.
137+
// We haven't collected a full window yet, but we're collecting and returning the metrics
138+
// for reporting
139139
clock = Clock.offset(clock, conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
140140
metricsCollector.setClock(clock);
141141
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
@@ -294,12 +294,12 @@ public void testJobDetailsRestCompatibility() throws JsonProcessingException {
294294
public void testMetricCollectorWindow() throws Exception {
295295
setDefaultMetrics(metricsCollector);
296296
var metricsHistory = metricsCollector.updateMetrics(context, stateStore);
297-
assertEquals(0, metricsHistory.getMetricHistory().size());
297+
assertEquals(1, metricsHistory.getMetricHistory().size());
298298

299-
// Not stable, nothing should be collected
299+
// Not stable, metrics collected and reported
300300
metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
301301
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
302-
assertEquals(0, metricsHistory.getMetricHistory().size());
302+
assertEquals(2, metricsHistory.getMetricHistory().size());
303303

304304
// Update clock to stable time
305305
var conf = context.getConfiguration();
@@ -336,10 +336,10 @@ public void testMetricCollectorWindow() throws Exception {
336336
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
337337
assertEquals(1, metricsHistory.getMetricHistory().size());
338338

339-
// Existing metrics should be cleared on job updates
339+
// Existing metrics should be cleared on job updates, and start collecting from fresh
340340
metricsCollector.setJobUpdateTs(clock.instant().plus(Duration.ofDays(10)));
341341
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
342-
assertEquals(0, metricsHistory.getMetricHistory().size());
342+
assertEquals(1, metricsHistory.getMetricHistory().size());
343343
}
344344

345345
@Test
@@ -351,10 +351,11 @@ public void testClearHistoryOnTopoChange() throws Exception {
351351

352352
setDefaultMetrics(metricsCollector);
353353

354-
// We haven't left the stabilization period
355-
// => no metrics reporting and collection should take place
354+
// We haven't left the stabilization period, we're returning the metrics
355+
// but don't act on it since the metric window is not full yet
356356
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
357-
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
357+
assertFalse(collectedMetrics.getMetricHistory().isEmpty());
358+
assertFalse(collectedMetrics.isFullyCollected());
358359
}
359360

360361
@Test
@@ -571,19 +572,18 @@ public void testMetricCollectionDuringStabilization() throws Exception {
571572
.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMillis(100));
572573
context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW, Duration.ofMillis(100));
573574

574-
// Within stabilization period we simply collect metrics but do not return them
575+
// Until window is full (time=200) we keep returning stabilizing metrics
575576
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), ZoneId.systemDefault()));
576-
assertTrue(
577-
metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty());
577+
assertEquals(
578+
1, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
578579
assertEquals(1, stateStore.getCollectedMetrics(context).size());
579580
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), ZoneId.systemDefault()));
580-
assertTrue(
581-
metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty());
581+
assertEquals(
582+
2, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
582583
assertEquals(2, stateStore.getCollectedMetrics(context).size());
583584

584585
testTolerateMetricsMissingDuringStabilizationPhase(topology);
585586

586-
// Until window is full (time=200) we keep returning stabilizing metrics
587587
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), ZoneId.systemDefault()));
588588
assertEquals(
589589
3, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/DateTimeUtilsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void testConvertInstantToReadableFormat() {
3232
Instant instant = Instant.ofEpochMilli(1702456327000L);
3333
String readableFormat1 = DateTimeUtils.readable(instant, ZoneId.of("Asia/Shanghai"));
3434
String readableFormat2 = DateTimeUtils.readable(instant, ZoneId.of("Europe/Berlin"));
35-
assertThat(readableFormat1).isEqualTo("2023-12-13 16:32:07");
36-
assertThat(readableFormat2).isEqualTo("2023-12-13 09:32:07");
35+
assertThat(readableFormat1).isEqualTo("2023-12-13 16:32:07.000");
36+
assertThat(readableFormat2).isEqualTo("2023-12-13 09:32:07.000");
3737
}
3838
}

0 commit comments

Comments
 (0)