Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/autoscaling/autoscaling-dynamic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: "1m"
job.autoscaler.metrics.window: "15m"
job.autoscaler.target.utilization: "0.5"
job.autoscaler.utilization.target: "0.5"
job.autoscaler.target.utilization.boundary: "0.3"
pipeline.max-parallelism: "32"
taskmanager.numberOfTaskSlots: "4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,24 +149,39 @@ public CollectedMetricHistory updateMetrics(
// Add scaling metrics to history if they were computed successfully
metricHistory.put(now, scalingMetrics);

if (isStabilizing) {
LOG.info("Stabilizing until {}", readable(stableTime));
stateStore.storeCollectedMetrics(ctx, metricHistory);
return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs);
}

var collectedMetrics = new CollectedMetricHistory(topology, metricHistory, jobRunningTs);
if (now.isBefore(windowFullTime)) {
LOG.info("Metric window not full until {}", readable(windowFullTime));
if (isStabilizing) {
LOG.info("Stabilizing until {}", readable(stableTime));
} else {
LOG.info(
"Metric window is not full until {}. {} samples collected so far",
readable(windowFullTime),
metricHistory.size());
}
} else {
collectedMetrics.setFullyCollected(true);
// Trim metrics outside the metric window from metrics history
metricHistory.headMap(now.minus(metricWindowSize)).clear();
var trimBefore = now.minus(metricWindowSize);
int numDropped = removeMetricsBefore(trimBefore, metricHistory);
LOG.debug(
"Metric window is now full. Dropped {} samples before {}, keeping {}.",
numDropped,
readable(trimBefore),
metricHistory.size());
}
stateStore.storeCollectedMetrics(ctx, metricHistory);
return collectedMetrics;
}

private int removeMetricsBefore(
Instant cutOffTimestamp, SortedMap<Instant, CollectedMetrics> metricHistory) {
var toBeDropped = metricHistory.headMap(cutOffTimestamp);
var numDropped = toBeDropped.size();
toBeDropped.clear();
return numDropped;
}

protected abstract Map<FlinkMetric, Metric> queryJmMetrics(Context ctx) throws Exception;

protected abstract Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ public void testEndToEnd() throws Exception {

setDefaultMetrics(metricsCollector);

// We haven't left the stabilization period
// => no metrics reporting and collection should take place
// We haven't left the stabilization period, but we're collecting and returning the metrics
// for reporting
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
assertEquals(1, collectedMetrics.getMetricHistory().size());

// We haven't collected a full window yet, no metrics should be reported but metrics should
// still get collected.
// We haven't collected a full window yet, but we're collecting and returning the metrics
// for reporting
clock = Clock.offset(clock, conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
metricsCollector.setClock(clock);
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
Expand Down Expand Up @@ -294,12 +294,12 @@ public void testJobDetailsRestCompatibility() throws JsonProcessingException {
public void testMetricCollectorWindow() throws Exception {
setDefaultMetrics(metricsCollector);
var metricsHistory = metricsCollector.updateMetrics(context, stateStore);
assertEquals(0, metricsHistory.getMetricHistory().size());
assertEquals(1, metricsHistory.getMetricHistory().size());

// Not stable, nothing should be collected
// Not stable, metrics collected and reported
metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
assertEquals(0, metricsHistory.getMetricHistory().size());
assertEquals(2, metricsHistory.getMetricHistory().size());

// Update clock to stable time
var conf = context.getConfiguration();
Expand Down Expand Up @@ -336,10 +336,10 @@ public void testMetricCollectorWindow() throws Exception {
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
assertEquals(1, metricsHistory.getMetricHistory().size());

// Existing metrics should be cleared on job updates
// Existing metrics should be cleared on job updates, and start collecting from fresh
metricsCollector.setJobUpdateTs(clock.instant().plus(Duration.ofDays(10)));
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
assertEquals(0, metricsHistory.getMetricHistory().size());
assertEquals(1, metricsHistory.getMetricHistory().size());
}

@Test
Expand All @@ -351,10 +351,11 @@ public void testClearHistoryOnTopoChange() throws Exception {

setDefaultMetrics(metricsCollector);

// We haven't left the stabilization period
// => no metrics reporting and collection should take place
// We haven't left the stabilization period, we're returning the metrics
// but don't act on it since the metric window is not full yet
var collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
assertFalse(collectedMetrics.getMetricHistory().isEmpty());
assertFalse(collectedMetrics.isFullyCollected());
}

@Test
Expand Down Expand Up @@ -571,19 +572,18 @@ public void testMetricCollectionDuringStabilization() throws Exception {
.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMillis(100));
context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW, Duration.ofMillis(100));

// Within stabilization period we simply collect metrics but do not return them
// Until window is full (time=200) we keep returning stabilizing metrics
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), ZoneId.systemDefault()));
assertTrue(
metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty());
assertEquals(
1, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
assertEquals(1, stateStore.getCollectedMetrics(context).size());
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), ZoneId.systemDefault()));
assertTrue(
metricsCollector.updateMetrics(context, stateStore).getMetricHistory().isEmpty());
assertEquals(
2, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
assertEquals(2, stateStore.getCollectedMetrics(context).size());

testTolerateMetricsMissingDuringStabilizationPhase(topology);

// Until window is full (time=200) we keep returning stabilizing metrics
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), ZoneId.systemDefault()));
assertEquals(
3, metricsCollector.updateMetrics(context, stateStore).getMetricHistory().size());
Expand Down
Loading