Skip to content

Commit 9c93f04

Browse files
committed
[FLINK-36409] Publish some autoscaler metrics during stabilisation period
1 parent 3a2cfdd commit 9c93f04

File tree

3 files changed

+43
-28
lines changed

3 files changed

+43
-28
lines changed

examples/autoscaling/autoscaling-dynamic.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ spec:
2727
job.autoscaler.enabled: "true"
2828
job.autoscaler.stabilization.interval: "1m"
2929
job.autoscaler.metrics.window: "15m"
30-
job.autoscaler.target.utilization: "0.5"
30+
job.autoscaler.utilization.target: "0.5"
3131
job.autoscaler.target.utilization.boundary: "0.3"
3232
pipeline.max-parallelism: "32"
3333
taskmanager.numberOfTaskSlots: "4"

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,24 +149,39 @@ public CollectedMetricHistory updateMetrics(
149149
// Add scaling metrics to history if they were computed successfully
150150
metricHistory.put(now, scalingMetrics);
151151

152-
if (isStabilizing) {
153-
LOG.info("Stabilizing until {}", readable(stableTime));
154-
stateStore.storeCollectedMetrics(ctx, metricHistory);
155-
return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs);
156-
}
157-
158152
var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
159153
if (now.isBefore(windowFullTime)) {
160-
LOG.info("Metric window not full until {}", readable(windowFullTime));
154+
if (isStabilizing) {
155+
LOG.info("Stabilizing until {}", readable(stableTime));
156+
} else {
157+
LOG.info(
158+
"Metric window is not full until {}. {} samples collected so far",
159+
readable(windowFullTime),
160+
metricHistory.size());
161+
}
161162
} else {
162163
collectedMetrics.setFullyCollected(true);
163164
// Trim metrics outside the metric window from metrics history
164-
metricHistory.headMap(now.minus(metricWindowSize)).clear();
165+
var trimBefore = now.minus(metricWindowSize);
166+
int numDropped = removeMetricsBefore(trimBefore, metricHistory);
167+
LOG.debug(
168+
"Metric window is now full. Dropped {} samples before {}, keeping {}.",
169+
numDropped,
170+
readable(trimBefore),
171+
metricHistory.size());
165172
}
166173
stateStore.storeCollectedMetrics(ctx, metricHistory);
167174
return collectedMetrics;
168175
}
169176

177+
private int removeMetricsBefore(
178+
Instant cutOffTimestamp, SortedMap<Instant, CollectedMetrics> metricHistory) {
179+
var toBeDropped = metricHistory.headMap(cutOffTimestamp);
180+
var numDropped = toBeDropped.size();
181+
toBeDropped.clear();
182+
return numDropped;
183+
}
184+
170185
protected abstract Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) throws Exception;
171186

172187
protected abstract Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx)

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,13 @@ public void testEndToEnd() throws Exception {
129129

130130
setDefaultMetrics(metricsCollector);
131131

132-
// We haven't left the stabilization period
133-
// => no metrics reporting and collection should take place
132+
// We haven't left the stabilization period, but we're collecting and returning the metrics
133+
// for reporting
134134
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
135-
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
135+
assertEquals(1, collectedMetrics.getMetricHistory().size());
136136

137-
// We haven't collected a full window yet, no metrics should be reported but metrics should
138-
// still get collected.
137+
// We haven't collected a full window yet, but we're collecting and returning the metrics
138+
// for reporting
139139
clock = Clock.offset(clock, conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
140140
metricsCollector.setClock(clock);
141141
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
@@ -294,12 +294,12 @@ public void testJobDetailsRestCompatibility() throws JsonProcessingException {
294294
public void testMetricCollectorWindow() throws Exception {
295295
setDefaultMetrics(metricsCollector);
296296
var metricsHistory = metricsCollector.updateMetrics(context, stateStore);
297-
assertEquals(0, metricsHistory.getMetricHistory().size());
297+
assertEquals(1, metricsHistory.getMetricHistory().size());
298298

299-
// Not stable, nothing should be collected
299+
// Not stable, metrics collected and reported
300300
metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
301301
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
302-
assertEquals(0, metricsHistory.getMetricHistory().size());
302+
assertEquals(2, metricsHistory.getMetricHistory().size());
303303

304304
// Update clock to stable time
305305
var conf = context.getConfiguration();
@@ -336,10 +336,10 @@ public void testMetricCollectorWindow() throws Exception {
336336
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
337337
assertEquals(1, metricsHistory.getMetricHistory().size());
338338

339-
// Existing metrics should be cleared on job updates
339+
// Existing metrics should be cleared on job updates, and start collecting from fresh
340340
metricsCollector.setJobUpdateTs(clock.instant().plus(Duration.ofDays(10)));
341341
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
342-
assertEquals(0, metricsHistory.getMetricHistory().size());
342+
assertEquals(1, metricsHistory.getMetricHistory().size());
343343
}
344344

345345
@Test
@@ -351,10 +351,11 @@ public void testClearHistoryOnTopoChange() throws Exception {
351351

352352
setDefaultMetrics(metricsCollector);
353353

354-
// We haven't left the stabilization period
355-
// => no metrics reporting and collection should take place
354+
// We haven't left the stabilization period, we're returning the metrics
355+
// but don't act on it since the metric window is not full yet
356356
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
357-
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
357+
assertFalse(collectedMetrics.getMetricHistory().isEmpty());
358+
assertFalse(collectedMetrics.isFullyCollected());
358359
}
359360

360361
@Test
@@ -571,19 +572,18 @@ public void testMetricCollectionDuringStabilization() throws Exception {
571572
.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMillis(100));
572573
context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW, Duration.ofMillis(100));
573574

574-
// Within stabilization period we simply collect metrics but do not return them
575+
// Until window is full (time=200) we keep returning stabilizing metrics
575576
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), ZoneId.systemDefault()));
576-
assertTrue(
577-
metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty());
577+
assertEquals(
578+
1, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
578579
assertEquals(1, stateStore.getCollectedMetrics(context).size());
579580
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), ZoneId.systemDefault()));
580-
assertTrue(
581-
metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty());
581+
assertEquals(
582+
2, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
582583
assertEquals(2, stateStore.getCollectedMetrics(context).size());
583584

584585
testTolerateMetricsMissingDuringStabilizationPhase(topology);
585586

586-
// Until window is full (time=200) we keep returning stabilizing metrics
587587
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), ZoneId.systemDefault()));
588588
assertEquals(
589589
3, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());

0 commit comments

Comments
 (0)