Skip to content

Commit 7a9bff7

Browse files
committed
[FLINK-37086] Clean up should remove all autoscaler state
1 parent 747d80a commit 7a9bff7

File tree

11 files changed

+73
-49
lines changed

11 files changed

+73
-49
lines changed

flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JdbcAutoScalerStateStore.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,19 +247,15 @@ public DelayedScaleDown getDelayedScaleDown(Context jobContext) {
247247

248248
@Override
249249
public void clearAll(Context jobContext) {
250-
jdbcStateStore.clearAll(getSerializeKey(jobContext));
250+
var serializedKey = getSerializeKey(jobContext);
251+
jdbcStateStore.clearAll(serializedKey);
251252
}
252253

253254
@Override
254255
public void flush(Context jobContext) throws Exception {
255256
jdbcStateStore.flush(getSerializeKey(jobContext));
256257
}
257258

258-
@Override
259-
public void removeInfoFromCache(KEY jobKey) {
260-
jdbcStateStore.removeInfoFromCache(getSerializeKey(jobKey));
261-
}
262-
263259
private String getSerializeKey(Context jobContext) {
264260
return getSerializeKey(jobContext.getJobKey());
265261
}

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,15 @@
4040
import java.util.Collections;
4141
import java.util.HashSet;
4242
import java.util.List;
43+
import java.util.Map;
4344
import java.util.Set;
4445
import java.util.concurrent.CompletableFuture;
46+
import java.util.concurrent.ConcurrentHashMap;
4547
import java.util.concurrent.ExecutorService;
4648
import java.util.concurrent.Executors;
4749
import java.util.concurrent.ScheduledExecutorService;
4850
import java.util.concurrent.TimeUnit;
51+
import java.util.function.Function;
4952
import java.util.stream.Collectors;
5053

5154
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
@@ -74,10 +77,10 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
7477
private final Set<KEY> scalingJobKeys;
7578

7679
/**
77-
* Maintain a set of scaling job keys for the last control loop, it should be accessed at {@link
80+
* Maintain a map of scaling job keys for the last control loop, it should be accessed at {@link
7881
* #scheduledExecutorService} thread.
7982
*/
80-
private Set<KEY> lastScalingKeys;
83+
private Map<KEY, Context> lastScaling;
8184

8285
public StandaloneAutoscalerExecutor(
8386
@Nonnull Configuration conf,
@@ -151,12 +154,12 @@ protected List<CompletableFuture<Void>> scaling() {
151154
throwable);
152155
}
153156
scalingJobKeys.remove(jobKey);
154-
if (!lastScalingKeys.contains(jobKey)) {
157+
if (!lastScaling.containsKey(jobKey)) {
155158
// Current job has been stopped. lastScalingKeys doesn't
156159
// contain jobKey means current job key was scaled in a
157160
// previous control loop, and current job is stopped in
158161
// the latest control loop.
159-
autoScaler.cleanup(jobKey);
162+
autoScaler.cleanup(jobContext);
160163
}
161164
},
162165
scheduledExecutorService));
@@ -165,18 +168,21 @@ protected List<CompletableFuture<Void>> scaling() {
165168
}
166169

167170
private void cleanupStoppedJob(Collection<Context> jobList) {
168-
var currentScalingKeys =
169-
jobList.stream().map(JobAutoScalerContext::getJobKey).collect(Collectors.toSet());
170-
if (lastScalingKeys != null) {
171-
lastScalingKeys.removeAll(currentScalingKeys);
172-
for (KEY jobKey : lastScalingKeys) {
171+
var jobs =
172+
jobList.stream()
173+
.collect(
174+
Collectors.toMap(
175+
JobAutoScalerContext::getJobKey, Function.identity()));
176+
if (lastScaling != null) {
177+
jobs.keySet().forEach(lastScaling::remove);
178+
for (Map.Entry<KEY, Context> job : lastScaling.entrySet()) {
173179
// Current job may be scaling, and cleanup should happen after scaling.
174-
if (!scalingJobKeys.contains(jobKey)) {
175-
autoScaler.cleanup(jobKey);
180+
if (!scalingJobKeys.contains(job.getKey())) {
181+
autoScaler.cleanup(job.getValue());
176182
}
177183
}
178184
}
179-
lastScalingKeys = currentScalingKeys;
185+
lastScaling = new ConcurrentHashMap<>(jobs);
180186
}
181187

182188
@VisibleForTesting

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void scale(JobAutoScalerContext<JobID> context) {
8080
}
8181

8282
@Override
83-
public void cleanup(JobID jobKey) {
83+
public void cleanup(JobAutoScalerContext<JobID> context) {
8484
fail("Should be called.");
8585
}
8686
};
@@ -126,7 +126,7 @@ public void scale(JobAutoScalerContext<JobID> context) {
126126
}
127127

128128
@Override
129-
public void cleanup(JobID jobID) {
129+
public void cleanup(JobAutoScalerContext<JobID> context) {
130130
fail("Should be called.");
131131
}
132132
})) {
@@ -164,7 +164,7 @@ public void scale(JobAutoScalerContext<JobID> context)
164164
}
165165

166166
@Override
167-
public void cleanup(JobID jobID) {
167+
public void cleanup(JobAutoScalerContext<JobID> context) {
168168
fail("Should be called.");
169169
}
170170
})) {
@@ -221,7 +221,7 @@ public void scale(JobAutoScalerContext<JobID> context)
221221
}
222222

223223
@Override
224-
public void cleanup(JobID jobID) {
224+
public void cleanup(JobAutoScalerContext<JobID> context) {
225225
fail("Should be called.");
226226
}
227227
})) {
@@ -255,7 +255,8 @@ public void scale(JobAutoScalerContext<JobID> context) {
255255
}
256256

257257
@Override
258-
public void cleanup(JobID jobID) {
258+
public void cleanup(JobAutoScalerContext<JobID> context) {
259+
var jobID = context.getJobKey();
259260
cleanupCounter.put(
260261
jobID, cleanupCounter.getOrDefault(jobID, 0) + 1);
261262
}
@@ -346,7 +347,8 @@ public void scale(JobAutoScalerContext<JobID> context)
346347
}
347348

348349
@Override
349-
public void cleanup(JobID jobID) {
350+
public void cleanup(JobAutoScalerContext<JobID> context) {
351+
var jobID = context.getJobKey();
350352
cleanupCounter.put(
351353
jobID, cleanupCounter.getOrDefault(jobID, 0) + 1);
352354
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
3939
/**
4040
* Called when the job is deleted.
4141
*
42-
* @param jobKey Job key.
42+
* @param context Job context.
4343
*/
44-
void cleanup(KEY jobKey);
44+
void cleanup(Context context);
4545
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,17 @@ public void scale(Context ctx) throws Exception {
118118
}
119119

120120
@Override
121-
public void cleanup(KEY jobKey) {
121+
public void cleanup(Context ctx) {
122122
LOG.info("Cleaning up autoscaling meta data");
123-
metricsCollector.cleanup(jobKey);
124-
lastEvaluatedMetrics.remove(jobKey);
125-
flinkMetrics.remove(jobKey);
126-
stateStore.removeInfoFromCache(jobKey);
123+
metricsCollector.cleanup(ctx.getJobKey());
124+
lastEvaluatedMetrics.remove(ctx.getJobKey());
125+
flinkMetrics.remove(ctx.getJobKey());
126+
try {
127+
stateStore.clearAll(ctx);
128+
stateStore.flush(ctx);
129+
} catch (Exception e) {
130+
LOG.error("Error cleaning up autoscaling meta data for {}", ctx.getJobKey(), e);
131+
}
127132
}
128133

129134
@VisibleForTesting

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/NoopJobAutoscaler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ public class NoopJobAutoscaler<KEY, Context extends JobAutoScalerContext<KEY>>
2424
public void scale(Context context) throws Exception {}
2525

2626
@Override
27-
public void cleanup(KEY jobKey) {}
27+
public void cleanup(Context jobKey) {}
2828
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,4 @@ void storeDelayedScaleDown(Context jobContext, DelayedScaleDown delayedScaleDown
9393
* was changed through this interface.
9494
*/
9595
void flush(Context jobContext) throws Exception;
96-
97-
/** Clean up all information related to the current job. */
98-
void removeInfoFromCache(KEY jobKey);
9996
}

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.autoscaler.state;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
2021
import org.apache.flink.autoscaler.DelayedScaleDown;
2122
import org.apache.flink.autoscaler.JobAutoScalerContext;
2223
import org.apache.flink.autoscaler.ScalingSummary;
@@ -34,6 +35,7 @@
3435
import java.util.SortedMap;
3536
import java.util.TreeMap;
3637
import java.util.concurrent.ConcurrentHashMap;
38+
import java.util.stream.Stream;
3739

3840
/**
3941
* State store based on the Java Heap, the state will be discarded after process restarts.
@@ -177,10 +179,16 @@ public void flush(Context jobContext) {
177179
// The InMemory state store doesn't persist data.
178180
}
179181

180-
@Override
181-
public void removeInfoFromCache(KEY jobKey) {
182-
scalingHistoryStore.remove(jobKey);
183-
collectedMetricsStore.remove(jobKey);
184-
parallelismOverridesStore.remove(jobKey);
182+
@VisibleForTesting
183+
public boolean hasDataFor(Context jobContext) {
184+
var k = jobContext.getJobKey();
185+
return Stream.of(
186+
scalingHistoryStore,
187+
parallelismOverridesStore,
188+
collectedMetricsStore,
189+
tmConfigOverrides,
190+
scalingTrackingStore,
191+
delayedScaleDownStore)
192+
.anyMatch(m -> m.containsKey(k));
185193
}
186194
}

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.autoscaler.metrics.TestMetrics;
2828
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
2929
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
30-
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
3130
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
3231
import org.apache.flink.autoscaler.topology.JobTopology;
3332
import org.apache.flink.autoscaler.topology.VertexInfo;
@@ -72,7 +71,7 @@ public class JobAutoScalerImplTest {
7271
private JobAutoScalerContext<JobID> context;
7372
private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer;
7473
private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
75-
private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;
74+
private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;
7675

7776
@BeforeEach
7877
public void setup() {
@@ -232,7 +231,12 @@ public void realizeParallelismOverrides(
232231
void testParallelismOverrides() throws Exception {
233232
var autoscaler =
234233
new JobAutoScalerImpl<>(
235-
null, null, null, eventCollector, scalingRealizer, stateStore);
234+
new TestingMetricsCollector<>(new JobTopology()),
235+
null,
236+
null,
237+
eventCollector,
238+
scalingRealizer,
239+
stateStore);
236240

237241
// Initially we should return empty overrides, do not crate any state
238242
assertThat(autoscaler.getParallelismOverrides(context)).isEmpty();
@@ -282,6 +286,17 @@ void testParallelismOverrides() throws Exception {
282286
context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "asd");
283287
autoscaler.scale(context);
284288
assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
289+
290+
context.getConfiguration().setString(AUTOSCALER_ENABLED.key(), "true");
291+
autoscaler.scale(context);
292+
assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
293+
294+
// Make sure cleanup removes everything
295+
assertTrue(stateStore.hasDataFor(context));
296+
autoscaler.cleanup(context);
297+
assertFalse(stateStore.hasDataFor(context));
298+
autoscaler.scale(context);
299+
assertParallelismOverrides(null);
285300
}
286301

287302
@Test

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/state/KubernetesAutoScalerStateStore.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,6 @@ public void flush(KubernetesJobAutoScalerContext jobContext) {
266266
configMapStore.flush(jobContext);
267267
}
268268

269-
@Override
270-
public void removeInfoFromCache(ResourceID resourceID) {
271-
configMapStore.removeInfoFromCache(resourceID);
272-
}
273-
274269
@SneakyThrows
275270
protected static String serializeScalingHistory(
276271
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {

0 commit comments

Comments
 (0)