Skip to content

Commit 04829bf

Browse files
authored
[FLINK-34212][autoscaler] Autoscaler Standalone cleans up stopped jobs to prevent memory leaks (#824)
1 parent 249a694 commit 04829bf

File tree

2 files changed

+236
-15
lines changed

2 files changed

+236
-15
lines changed

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,18 @@
3535

3636
import java.io.Closeable;
3737
import java.time.Duration;
38+
import java.util.ArrayList;
3839
import java.util.Collection;
40+
import java.util.Collections;
3941
import java.util.HashSet;
42+
import java.util.List;
4043
import java.util.Set;
4144
import java.util.concurrent.CompletableFuture;
4245
import java.util.concurrent.ExecutorService;
4346
import java.util.concurrent.Executors;
4447
import java.util.concurrent.ScheduledExecutorService;
4548
import java.util.concurrent.TimeUnit;
49+
import java.util.stream.Collectors;
4650

4751
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
4852
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
@@ -64,11 +68,17 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
6468
private final UnmodifiableConfiguration baseConf;
6569

6670
/**
67-
* Maintain a set of job keys that during scaling, it should be updated at {@link
71+
* Maintain a set of job keys that during scaling, it should be accessed at {@link
6872
* #scheduledExecutorService} thread.
6973
*/
7074
private final Set<KEY> scalingJobKeys;
7175

76+
/**
77+
* Maintain a set of scaling job keys for the last control loop, it should be accessed at {@link
78+
* #scheduledExecutorService} thread.
79+
*/
80+
private Set<KEY> lastScalingKeys;
81+
7282
public StandaloneAutoscalerExecutor(
7383
@Nonnull Configuration conf,
7484
@Nonnull JobListFetcher<KEY, Context> jobListFetcher,
@@ -105,36 +115,67 @@ public void close() {
105115
scalingThreadPool.shutdownNow();
106116
}
107117

118+
/**
119+
* @return All CompletableFuture for all scaling jobs, note: it's only used for test for now.
120+
*/
108121
@VisibleForTesting
109-
protected void scaling() {
122+
protected List<CompletableFuture<Void>> scaling() {
110123
LOG.info("Standalone autoscaler starts scaling.");
111124
Collection<Context> jobList;
112125
try {
113126
jobList = jobListFetcher.fetch(baseConf);
114127
} catch (Throwable e) {
115128
LOG.error("Error while fetch job list.", e);
116-
return;
129+
return Collections.emptyList();
117130
}
118131

132+
cleanupStoppedJob(jobList);
133+
134+
var resultFutures = new ArrayList<CompletableFuture<Void>>();
119135
for (var jobContext : jobList) {
120136
final var jobKey = jobContext.getJobKey();
121137
if (scalingJobKeys.contains(jobKey)) {
122138
continue;
123139
}
124140
scalingJobKeys.add(jobKey);
125-
CompletableFuture.runAsync(() -> scalingSingleJob(jobContext), scalingThreadPool)
126-
.whenCompleteAsync(
127-
(result, throwable) -> {
128-
if (throwable != null) {
129-
LOG.error(
130-
"Error while jobKey: {} executing scaling .",
131-
jobKey,
132-
throwable);
133-
}
134-
scalingJobKeys.remove(jobKey);
135-
},
136-
scheduledExecutorService);
141+
resultFutures.add(
142+
CompletableFuture.runAsync(
143+
() -> scalingSingleJob(jobContext), scalingThreadPool)
144+
.whenCompleteAsync(
145+
(result, throwable) -> {
146+
if (throwable != null) {
147+
LOG.error(
148+
"Error while jobKey: {} executing scaling .",
149+
jobKey,
150+
throwable);
151+
}
152+
scalingJobKeys.remove(jobKey);
153+
if (!lastScalingKeys.contains(jobKey)) {
154+
// Current job has been stopped. lastScalingKeys doesn't
155+
// contain jobKey means current job key was scaled in a
156+
// previous control loop, and current job is stopped in
157+
// the latest control loop.
158+
autoScaler.cleanup(jobKey);
159+
}
160+
},
161+
scheduledExecutorService));
162+
}
163+
return resultFutures;
164+
}
165+
166+
private void cleanupStoppedJob(Collection<Context> jobList) {
167+
var currentScalingKeys =
168+
jobList.stream().map(JobAutoScalerContext::getJobKey).collect(Collectors.toSet());
169+
if (lastScalingKeys != null) {
170+
lastScalingKeys.removeAll(currentScalingKeys);
171+
for (KEY jobKey : lastScalingKeys) {
172+
// Current job may be scaling, and cleanup should happen after scaling.
173+
if (!scalingJobKeys.contains(jobKey)) {
174+
autoScaler.cleanup(jobKey);
175+
}
176+
}
137177
}
178+
lastScalingKeys = currentScalingKeys;
138179
}
139180

140181
@VisibleForTesting

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.autoscaler.JobAutoScalerContext;
2424
import org.apache.flink.autoscaler.event.TestingEventCollector;
2525
import org.apache.flink.configuration.Configuration;
26+
import org.apache.flink.util.concurrent.FutureUtils;
2627

2728
import org.junit.jupiter.api.Test;
2829
import org.junit.jupiter.params.ParameterizedTest;
@@ -33,8 +34,10 @@
3334
import java.util.Collections;
3435
import java.util.HashMap;
3536
import java.util.List;
37+
import java.util.Map;
3638
import java.util.Set;
3739
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.ConcurrentHashMap;
3841
import java.util.concurrent.CountDownLatch;
3942
import java.util.concurrent.atomic.AtomicLong;
4043

@@ -227,6 +230,183 @@ public void cleanup(JobID jobID) {
227230
}
228231
}
229232

233+
@Test
234+
void testCleanupAfterStopped() throws Exception {
235+
var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
236+
237+
var job1 = createJobAutoScalerContext();
238+
var job2 = createJobAutoScalerContext();
239+
var scaleCounter = new ConcurrentHashMap<JobID, Integer>();
240+
var cleanupCounter = new ConcurrentHashMap<JobID, Integer>();
241+
242+
var jobList = new ArrayList<JobAutoScalerContext<JobID>>();
243+
244+
try (var autoscalerExecutor =
245+
new StandaloneAutoscalerExecutor<>(
246+
new Configuration(),
247+
baseConf -> jobList,
248+
eventCollector,
249+
new JobAutoScaler<>() {
250+
@Override
251+
public void scale(JobAutoScalerContext<JobID> context) {
252+
scaleCounter.put(
253+
context.getJobKey(),
254+
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
255+
}
256+
257+
@Override
258+
public void cleanup(JobID jobID) {
259+
cleanupCounter.put(
260+
jobID, cleanupCounter.getOrDefault(jobID, 0) + 1);
261+
}
262+
})) {
263+
264+
// Test for empty job list.
265+
var scaledFutures = autoscalerExecutor.scaling();
266+
assertThat(scaledFutures).isEmpty();
267+
FutureUtils.waitForAll(scaledFutures).get();
268+
assertThat(scaleCounter).isEmpty();
269+
assertThat(cleanupCounter).isEmpty();
270+
271+
// Test for 2 jobs twice.
272+
jobList.add(job1);
273+
jobList.add(job2);
274+
275+
scaledFutures = autoscalerExecutor.scaling();
276+
assertThat(scaledFutures).hasSize(2);
277+
FutureUtils.waitForAll(scaledFutures).get();
278+
assertThat(scaleCounter)
279+
.containsExactlyInAnyOrderEntriesOf(
280+
Map.of(job1.getJobKey(), 1, job2.getJobKey(), 1));
281+
assertThat(cleanupCounter).isEmpty();
282+
283+
scaledFutures = autoscalerExecutor.scaling();
284+
assertThat(scaledFutures).hasSize(2);
285+
FutureUtils.waitForAll(scaledFutures).get();
286+
assertThat(scaleCounter)
287+
.containsExactlyInAnyOrderEntriesOf(
288+
Map.of(job1.getJobKey(), 2, job2.getJobKey(), 2));
289+
assertThat(cleanupCounter).isEmpty();
290+
291+
// Test for 1 job twice.
292+
jobList.clear();
293+
jobList.add(job2);
294+
295+
scaledFutures = autoscalerExecutor.scaling();
296+
assertThat(scaledFutures).hasSize(1);
297+
FutureUtils.waitForAll(scaledFutures).get();
298+
assertThat(scaleCounter)
299+
.containsExactlyInAnyOrderEntriesOf(
300+
Map.of(job1.getJobKey(), 2, job2.getJobKey(), 3));
301+
assertThat(cleanupCounter)
302+
.containsExactlyInAnyOrderEntriesOf(Map.of(job1.getJobKey(), 1));
303+
304+
scaledFutures = autoscalerExecutor.scaling();
305+
assertThat(scaledFutures).hasSize(1);
306+
FutureUtils.waitForAll(scaledFutures).get();
307+
assertThat(scaleCounter)
308+
.containsExactlyInAnyOrderEntriesOf(
309+
Map.of(job1.getJobKey(), 2, job2.getJobKey(), 4));
310+
// Only cleanup once.
311+
assertThat(cleanupCounter)
312+
.containsExactlyInAnyOrderEntriesOf(Map.of(job1.getJobKey(), 1));
313+
}
314+
}
315+
316+
@Test
317+
void testCleanupForStoppedJobAfterScaling() throws Exception {
318+
var eventCollector = new TestingEventCollector<JobID, JobAutoScalerContext<JobID>>();
319+
320+
var job1 = createJobAutoScalerContext();
321+
var job2 = createJobAutoScalerContext();
322+
var scaleCounter = new ConcurrentHashMap<JobID, Integer>();
323+
var cleanupCounter = new ConcurrentHashMap<JobID, Integer>();
324+
325+
var job1StartWaitFuture = new CompletableFuture<>();
326+
var job1WaitFuture = new CompletableFuture<>();
327+
328+
var jobList = new ArrayList<JobAutoScalerContext<JobID>>();
329+
330+
try (var autoscalerExecutor =
331+
new StandaloneAutoscalerExecutor<>(
332+
new Configuration(),
333+
baseConf -> jobList,
334+
eventCollector,
335+
new JobAutoScaler<>() {
336+
@Override
337+
public void scale(JobAutoScalerContext<JobID> context)
338+
throws Exception {
339+
scaleCounter.put(
340+
context.getJobKey(),
341+
scaleCounter.getOrDefault(context.getJobKey(), 0) + 1);
342+
if (context == job1) {
343+
job1StartWaitFuture.complete(null);
344+
job1WaitFuture.get();
345+
}
346+
}
347+
348+
@Override
349+
public void cleanup(JobID jobID) {
350+
cleanupCounter.put(
351+
jobID, cleanupCounter.getOrDefault(jobID, 0) + 1);
352+
}
353+
})) {
354+
355+
// Test for job1 and job2
356+
jobList.add(job1);
357+
jobList.add(job2);
358+
359+
var scaledFutures = autoscalerExecutor.scaling();
360+
var job1ScaledFuture = scaledFutures.get(0);
361+
362+
assertThat(scaledFutures).hasSize(2);
363+
// wait for job2 scaling is finished.
364+
scaledFutures.get(1).get();
365+
// wait for job1 starts wait.
366+
job1StartWaitFuture.get();
367+
assertThat(scaleCounter)
368+
.containsExactlyInAnyOrderEntriesOf(
369+
Map.of(job1.getJobKey(), 1, job2.getJobKey(), 1));
370+
assertThat(cleanupCounter).isEmpty();
371+
372+
scalingOnlyHappenForJob2(
373+
job1, job2, scaleCounter, cleanupCounter, autoscalerExecutor, 2);
374+
375+
// Test for job2 twice, and job1 should be clean up after scaling.
376+
jobList.clear();
377+
jobList.add(job2);
378+
379+
scalingOnlyHappenForJob2(
380+
job1, job2, scaleCounter, cleanupCounter, autoscalerExecutor, 3);
381+
scalingOnlyHappenForJob2(
382+
job1, job2, scaleCounter, cleanupCounter, autoscalerExecutor, 4);
383+
384+
// Wait for job1 scaling to complete.
385+
job1WaitFuture.complete(null);
386+
job1ScaledFuture.get();
387+
assertThat(cleanupCounter)
388+
.containsExactlyInAnyOrderEntriesOf(Map.of(job1.getJobKey(), 1));
389+
}
390+
}
391+
392+
private void scalingOnlyHappenForJob2(
393+
JobAutoScalerContext<JobID> job1,
394+
JobAutoScalerContext<JobID> job2,
395+
ConcurrentHashMap<JobID, Integer> scaleCounter,
396+
ConcurrentHashMap<JobID, Integer> cleanupCounter,
397+
StandaloneAutoscalerExecutor<JobID, JobAutoScalerContext<JobID>> autoscalerExecutor,
398+
int expectedJob2ScaleCounter)
399+
throws Exception {
400+
var scaledFutures = autoscalerExecutor.scaling();
401+
assertThat(scaledFutures).hasSize(1);
402+
FutureUtils.waitForAll(scaledFutures).get();
403+
assertThat(scaleCounter)
404+
.containsExactlyInAnyOrderEntriesOf(
405+
Map.of(job1.getJobKey(), 1, job2.getJobKey(), expectedJob2ScaleCounter));
406+
// job1 is still scaling, it cannot be cleaned up.
407+
assertThat(cleanupCounter).isEmpty();
408+
}
409+
230410
private static JobAutoScalerContext<JobID> createJobAutoScalerContext() {
231411
var jobID = new JobID();
232412
return new JobAutoScalerContext<>(

0 commit comments

Comments
 (0)