diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java index dfadc9ac38..1ed3ad96e7 100644 --- a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java +++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java @@ -247,7 +247,8 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) { @Override public void clearAll(Context jobContext) { - jdbcStateStore.clearAll(getSerializeKey(jobContext)); + var serializedKey = getSerializeKey(jobContext); + jdbcStateStore.clearAll(serializedKey); } @Override @@ -255,11 +256,6 @@ public void flush(Context jobContext) throws Exception { jdbcStateStore.flush(getSerializeKey(jobContext)); } - @Override - public void removeInfoFromCache(KEY jobKey) { - jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey)); - } - private String getSerializeKey(Context jobContext) { return getSerializeKey(jobContext.getJobKey()); } diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java index 87875ec192..a87de96412 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java @@ -40,12 +40,15 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL; @@ -74,10 +77,10 @@ public class StandaloneAutoscalerExecutor scalingJobKeys; /** - * Maintain a set of scaling job keys for the last control loop, it should be accessed at {@link + * Maintain a map of scaling job keys for the last control loop, it should be accessed at {@link * #scheduledExecutorService} thread. */ - private Set lastScalingKeys; + private Map lastScaling; public StandaloneAutoscalerExecutor( @Nonnull Configuration conf, @@ -151,12 +154,12 @@ protected List> scaling() { throwable); } scalingJobKeys.remove(jobKey); - if (!lastScalingKeys.contains(jobKey)) { + if (!lastScaling.containsKey(jobKey)) { // Current job has been stopped. lastScalingKeys doesn't // contain jobKey means current job key was scaled in a // previous control loop, and current job is stopped in // the latest control loop. - autoScaler.cleanup(jobKey); + autoScaler.cleanup(jobContext); } }, scheduledExecutorService)); @@ -165,18 +168,21 @@ protected List> scaling() { } private void cleanupStoppedJob(Collection jobList) { - var currentScalingKeys = - jobList.stream().map(JobAutoScalerContext::getJobKey).collect(Collectors.toSet()); - if (lastScalingKeys != null) { - lastScalingKeys.removeAll(currentScalingKeys); - for (KEY jobKey : lastScalingKeys) { + var jobs = + jobList.stream() + .collect( + Collectors.toMap( + JobAutoScalerContext::getJobKey, Function.identity())); + if (lastScaling != null) { + jobs.keySet().forEach(lastScaling::remove); + for (Map.Entry job : lastScaling.entrySet()) { // Current job may be scaling, and cleanup should happen after scaling. - if (!scalingJobKeys.contains(jobKey)) { - autoScaler.cleanup(jobKey); + if (!scalingJobKeys.contains(job.getKey())) { + autoScaler.cleanup(job.getValue()); } } } - lastScalingKeys = currentScalingKeys; + lastScaling = new ConcurrentHashMap<>(jobs); } @VisibleForTesting diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java index caffdbbbec..b17d4ba220 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java @@ -80,7 +80,7 @@ public void scale(JobAutoScalerContext context) { } @Override - public void cleanup(JobID jobKey) { + public void cleanup(JobAutoScalerContext context) { fail("Should be called."); } }; @@ -126,7 +126,7 @@ public void scale(JobAutoScalerContext context) { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext context) { fail("Should be called."); } })) { @@ -164,7 +164,7 @@ public void scale(JobAutoScalerContext context) } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext context) { fail("Should be called."); } })) { @@ -221,7 +221,7 @@ public void scale(JobAutoScalerContext context) } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext context) { fail("Should be called."); } })) { @@ -255,7 +255,8 @@ public void scale(JobAutoScalerContext context) { } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext context) { + var jobID = context.getJobKey(); cleanupCounter.put( jobID, cleanupCounter.getOrDefault(jobID, 0) + 1); } @@ -346,7 +347,8 @@ public void scale(JobAutoScalerContext context) } @Override - public void cleanup(JobID jobID) { + public void cleanup(JobAutoScalerContext context) { + var jobID = context.getJobKey(); cleanupCounter.put( jobID, cleanupCounter.getOrDefault(jobID, 0) + 1); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java index f31fac107e..9c5c374974 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java @@ -39,7 +39,7 @@ public interface JobAutoScaler> { /** * Called when the job is deleted. * - * @param jobKey Job key. + * @param context Job context. */ - void cleanup(KEY jobKey); + void cleanup(Context context); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index f3af3baa61..d0e29e677b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -118,12 +118,17 @@ public void scale(Context ctx) throws Exception { } @Override - public void cleanup(KEY jobKey) { + public void cleanup(Context ctx) { LOG.info("Cleaning up autoscaling meta data"); - metricsCollector.cleanup(jobKey); - lastEvaluatedMetrics.remove(jobKey); - flinkMetrics.remove(jobKey); - stateStore.removeInfoFromCache(jobKey); + metricsCollector.cleanup(ctx.getJobKey()); + lastEvaluatedMetrics.remove(ctx.getJobKey()); + flinkMetrics.remove(ctx.getJobKey()); + try { + stateStore.clearAll(ctx); + stateStore.flush(ctx); + } catch (Exception e) { + LOG.error("Error cleaning up autoscaling meta data for {}", ctx.getJobKey(), e); + } } @VisibleForTesting diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java index a7a66f14cb..91ebfd0cd1 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java @@ -24,5 +24,5 @@ public class NoopJobAutoscaler> public void scale(Context context) throws Exception {} @Override - public void cleanup(KEY jobKey) {} + public void cleanup(Context jobKey) {} } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java index 15581b7584..64e4dd5bc2 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java @@ -93,7 +93,4 @@ void storeDelayedScaleDown(Context jobContext, DelayedScaleDown delayedScaleDown * was changed through this interface. */ void flush(Context jobContext) throws Exception; - - /** Clean up all information related to the current job. */ - void removeInfoFromCache(KEY jobKey); } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java index cf6aa74f1a..c660f73b64 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java @@ -17,6 +17,7 @@ package org.apache.flink.autoscaler.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.autoscaler.DelayedScaleDown; import org.apache.flink.autoscaler.JobAutoScalerContext; import org.apache.flink.autoscaler.ScalingSummary; @@ -34,6 +35,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; /** * State store based on the Java Heap, the state will be discarded after process restarts. @@ -177,10 +179,16 @@ public void flush(Context jobContext) { // The InMemory state store doesn't persist data. } - @Override - public void removeInfoFromCache(KEY jobKey) { - scalingHistoryStore.remove(jobKey); - collectedMetricsStore.remove(jobKey); - parallelismOverridesStore.remove(jobKey); + @VisibleForTesting + public boolean hasDataFor(Context jobContext) { + var k = jobContext.getJobKey(); + return Stream.of( + scalingHistoryStore, + parallelismOverridesStore, + collectedMetricsStore, + tmConfigOverrides, + scalingTrackingStore, + delayedScaleDownStore) + .anyMatch(m -> m.containsKey(k)); } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java index c9d74e65d8..552971d7a5 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java @@ -27,7 +27,6 @@ import org.apache.flink.autoscaler.metrics.TestMetrics; import org.apache.flink.autoscaler.realizer.ScalingRealizer; import org.apache.flink.autoscaler.realizer.TestingScalingRealizer; -import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore; import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.autoscaler.topology.VertexInfo; @@ -72,7 +71,7 @@ public class JobAutoScalerImplTest { private JobAutoScalerContext context; private TestingScalingRealizer> scalingRealizer; private TestingEventCollector> eventCollector; - private AutoScalerStateStore> stateStore; + private InMemoryAutoScalerStateStore> stateStore; @BeforeEach public void setup() { @@ -232,7 +231,12 @@ public void realizeParallelismOverrides( void testParallelismOverrides() throws Exception { var autoscaler = new JobAutoScalerImpl<>( - null, null, null, eventCollector, scalingRealizer, stateStore); + new TestingMetricsCollector<>(new JobTopology()), + null, + null, + eventCollector, + scalingRealizer, + stateStore); // Initially we should return empty overrides, do not crate any state assertThat(autoscaler.getParallelismOverrides(context)).isEmpty(); @@ -282,6 +286,17 @@ void testParallelismOverrides() throws Exception { context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "asd"); autoscaler.scale(context); assertParallelismOverrides(Map.of(v1, "1", v2, "2")); + + context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true"); + autoscaler.scale(context); + assertParallelismOverrides(Map.of(v1, "1", v2, "2")); + + // Make sure cleanup removes everything + assertTrue(stateStore.hasDataFor(context)); + autoscaler.cleanup(context); + assertFalse(stateStore.hasDataFor(context)); + autoscaler.scale(context); + assertParallelismOverrides(null); } @Test diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java index 1059b96ab0..da445973d1 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java @@ -266,11 +266,6 @@ public void flush(KubernetesJobAutoScalerContext jobContext) { configMapStore.flush(jobContext); } - @Override - public void removeInfoFromCache(ResourceID resourceID) { - configMapStore.removeInfoFromCache(resourceID); - } - @SneakyThrows protected static String serializeScalingHistory( Map> scalingHistory) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 757e1a7cd7..3b2048c9f7 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -276,7 +276,7 @@ protected abstract boolean reconcileSpecChange( @Override public DeleteControl cleanup(FlinkResourceContext ctx) { - autoscaler.cleanup(ResourceID.fromResource(ctx.getResource())); + autoscaler.cleanup(ctx.getJobAutoScalerContext()); return cleanupInternal(ctx); }