Skip to content

Commit 75eb206

Browse files
committed
[FLINK-34906][autoscaler] Only scale when all tasks are running
1 parent be3b79b commit 75eb206

File tree

6 files changed

+225
-139
lines changed

6 files changed

+225
-139
lines changed

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
import org.apache.flink.api.common.JobID;
2121
import org.apache.flink.autoscaler.JobAutoScalerContext;
2222
import org.apache.flink.autoscaler.standalone.JobListFetcher;
23+
import org.apache.flink.autoscaler.utils.JobStatusUtils;
2324
import org.apache.flink.client.program.rest.RestClusterClient;
2425
import org.apache.flink.configuration.Configuration;
2526
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
2627
import org.apache.flink.runtime.client.JobStatusMessage;
28+
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
2729
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
2830
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
31+
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
2932
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
3033
import org.apache.flink.util.function.FunctionWithException;
3134

@@ -54,7 +57,11 @@ public FlinkClusterJobListFetcher(
5457
public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
5558
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
5659
return restClusterClient
57-
.listJobs()
60+
.sendRequest(
61+
JobsOverviewHeaders.getInstance(),
62+
EmptyMessageParameters.getInstance(),
63+
EmptyRequestBody.getInstance())
64+
.thenApply(JobStatusUtils::toJobStatusMessage)
5865
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
5966
.stream()
6067
.map(

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java

Lines changed: 75 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import org.apache.flink.api.common.JobStatus;
2222
import org.apache.flink.client.program.rest.RestClusterClient;
2323
import org.apache.flink.configuration.Configuration;
24-
import org.apache.flink.runtime.client.JobStatusMessage;
24+
import org.apache.flink.runtime.execution.ExecutionState;
2525
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
26+
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
27+
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
2628
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
2729
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
30+
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
2831
import org.apache.flink.runtime.rest.messages.MessageHeaders;
2932
import org.apache.flink.runtime.rest.messages.MessageParameters;
3033
import org.apache.flink.runtime.rest.messages.RequestBody;
@@ -38,13 +41,12 @@
3841
import javax.annotation.Nullable;
3942

4043
import java.time.Duration;
41-
import java.time.Instant;
42-
import java.util.Collection;
43-
import java.util.List;
44+
import java.util.Arrays;
4445
import java.util.Map;
4546
import java.util.concurrent.CompletableFuture;
4647
import java.util.concurrent.TimeoutException;
4748
import java.util.concurrent.atomic.AtomicLong;
49+
import java.util.stream.Collectors;
4850

4951
import static org.assertj.core.api.Assertions.assertThat;
5052
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,12 +58,9 @@ class FlinkClusterJobListFetcherTest {
5658
/** Test whether the job list and confs are expected. */
5759
@Test
5860
void testFetchJobListAndConfigurationInfo() throws Exception {
59-
var job1 =
60-
new JobStatusMessage(
61-
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
62-
var job2 =
63-
new JobStatusMessage(
64-
new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
61+
var job1 = new JobID();
62+
var job2 = new JobID();
63+
var jobs = Map.of(job1, JobStatus.RUNNING, job2, JobStatus.CANCELLING);
6564

6665
Configuration expectedConf1 = new Configuration();
6766
expectedConf1.setString("option_key1", "option_value1");
@@ -70,18 +69,17 @@ void testFetchJobListAndConfigurationInfo() throws Exception {
7069
expectedConf2.setString("option_key2", "option_value2");
7170
expectedConf2.setString("option_key3", "option_value3");
7271

73-
var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
74-
var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
72+
var configurations = Map.of(job1, expectedConf1, job2, expectedConf2);
7573
var closeCounter = new AtomicLong();
7674
FlinkClusterJobListFetcher jobListFetcher =
7775
new FlinkClusterJobListFetcher(
7876
getRestClusterClient(
79-
Either.Left(List.of(job1, job2)),
77+
Either.Left(jobs),
8078
Either.Left(
8179
Map.of(
82-
job1.getJobId(),
80+
job1,
8381
ConfigurationInfo.from(expectedConf1),
84-
job2.getJobId(),
82+
job2,
8583
ConfigurationInfo.from(expectedConf2))),
8684
closeCounter),
8785
Duration.ofSeconds(10));
@@ -94,11 +92,9 @@ void testFetchJobListAndConfigurationInfo() throws Exception {
9492

9593
assertThat(fetchedJobList).hasSize(2);
9694
for (var jobContext : fetchedJobList) {
97-
JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
95+
var expectedJobState = jobs.get(jobContext.getJobID());
9896
Configuration expectedConf = configurations.get(jobContext.getJobID());
99-
assertThat(expectedJobStatusMessage).isNotNull();
100-
assertThat(jobContext.getJobStatus())
101-
.isEqualTo(expectedJobStatusMessage.getJobState());
97+
assertThat(jobContext.getJobStatus()).isEqualTo(expectedJobState);
10298
assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
10399
}
104100
}
@@ -130,16 +126,13 @@ void testFetchJobListException() {
130126
*/
131127
@Test
132128
void testFetchConfigurationException() {
133-
var job1 =
134-
new JobStatusMessage(
135-
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
136129
var expectedException = new RuntimeException("Expected exception.");
137130
var closeCounter = new AtomicLong();
138131

139132
FlinkClusterJobListFetcher jobListFetcher =
140133
new FlinkClusterJobListFetcher(
141134
getRestClusterClient(
142-
Either.Left(List.of(job1)),
135+
Either.Left(Map.of(new JobID(), JobStatus.RUNNING)),
143136
Either.Right(expectedException),
144137
closeCounter),
145138
Duration.ofSeconds(10));
@@ -171,14 +164,12 @@ void testFetchJobListTimeout() {
171164
*/
172165
@Test
173166
void testFetchConfigurationTimeout() {
174-
var job1 =
175-
new JobStatusMessage(
176-
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
177167
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
178168

179169
FlinkClusterJobListFetcher jobListFetcher =
180170
new FlinkClusterJobListFetcher(
181-
getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
171+
getTimeoutableRestClusterClient(
172+
Map.of(new JobID(), JobStatus.RUNNING), null, closeFuture),
182173
Duration.ofSeconds(2));
183174

184175
assertThat(closeFuture).isNotDone();
@@ -189,16 +180,16 @@ void testFetchConfigurationTimeout() {
189180
}
190181

191182
/**
192-
* @param jobListOrException When listJobs is called, return jobList if Either is left, return
193-
* failedFuture if Either is right.
183+
* @param jobsOrException When the jobs overview is called, return jobList if Either is left,
184+
* return failedFuture if Either is right.
194185
* @param configurationsOrException When fetch job conf, return configuration if Either is left,
195186
* return failedFuture if Either is right.
196187
* @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
197188
* called
198189
*/
199190
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
200191
getRestClusterClient(
201-
Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
192+
Either<Map<JobID, JobStatus>, Throwable> jobsOrException,
202193
Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
203194
AtomicLong closeCounter) {
204195
return conf ->
@@ -207,14 +198,6 @@ void testFetchConfigurationTimeout() {
207198
"test-cluster",
208199
(c, e) -> new StandaloneClientHAServices("localhost")) {
209200

210-
@Override
211-
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
212-
if (jobListOrException.isLeft()) {
213-
return CompletableFuture.completedFuture(jobListOrException.left());
214-
}
215-
return CompletableFuture.failedFuture(jobListOrException.right());
216-
}
217-
218201
@Override
219202
public <
220203
M extends MessageHeaders<R, P, U>,
@@ -231,6 +214,22 @@ CompletableFuture<P> sendRequest(M h, U p, R r) {
231214
return (CompletableFuture<P>)
232215
CompletableFuture.completedFuture(
233216
configurationsOrException.left().get(jobID));
217+
} else if (h instanceof JobsOverviewHeaders) {
218+
if (jobsOrException.isLeft()) {
219+
return (CompletableFuture<P>)
220+
CompletableFuture.completedFuture(
221+
new MultipleJobsDetails(
222+
jobsOrException.left().entrySet().stream()
223+
.map(
224+
entry ->
225+
generateJobDetails(
226+
entry
227+
.getKey(),
228+
entry
229+
.getValue()))
230+
.collect(Collectors.toList())));
231+
}
232+
return CompletableFuture.failedFuture(jobsOrException.right());
234233
}
235234
fail("Unknown request");
236235
return null;
@@ -245,15 +244,15 @@ public void close() {
245244
}
246245

247246
/**
248-
* @param jobList When listJobs is called, return jobList if it's not null, don't complete
247+
* @param jobs When the jobs overview is called, return jobList if it's not null, don't complete
249248
* future if it's null.
250249
* @param configuration When fetch job conf, return configuration if it's not null, don't
251250
* complete future if it's null.
252251
* @param closeFuture Complete this closeFuture when {@link RestClusterClient#close} is called.
253252
*/
254253
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
255254
getTimeoutableRestClusterClient(
256-
@Nullable Collection<JobStatusMessage> jobList,
255+
@Nullable Map<JobID, JobStatus> jobs,
257256
@Nullable ConfigurationInfo configuration,
258257
CompletableFuture<Void> closeFuture) {
259258
return conf ->
@@ -262,14 +261,6 @@ public void close() {
262261
"test-cluster",
263262
(c, e) -> new StandaloneClientHAServices("localhost")) {
264263

265-
@Override
266-
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
267-
if (jobList == null) {
268-
return new CompletableFuture<>();
269-
}
270-
return CompletableFuture.completedFuture(jobList);
271-
}
272-
273264
@Override
274265
public <
275266
M extends MessageHeaders<R, P, U>,
@@ -283,6 +274,21 @@ CompletableFuture<P> sendRequest(M h, U p, R r) {
283274
}
284275
return (CompletableFuture<P>)
285276
CompletableFuture.completedFuture(configuration);
277+
} else if (h instanceof JobsOverviewHeaders) {
278+
if (jobs == null) {
279+
return new CompletableFuture<>();
280+
}
281+
return (CompletableFuture<P>)
282+
CompletableFuture.completedFuture(
283+
new MultipleJobsDetails(
284+
jobs.entrySet().stream()
285+
.map(
286+
entry ->
287+
generateJobDetails(
288+
entry.getKey(),
289+
entry
290+
.getValue()))
291+
.collect(Collectors.toList())));
286292
}
287293
fail("Unknown request");
288294
return null;
@@ -295,4 +301,25 @@ public void close() {
295301
}
296302
};
297303
}
304+
305+
private static JobDetails generateJobDetails(JobID jobID, JobStatus jobStatus) {
306+
int[] countPerState = new int[ExecutionState.values().length];
307+
if (jobStatus == JobStatus.RUNNING) {
308+
countPerState[ExecutionState.RUNNING.ordinal()] = 5;
309+
countPerState[ExecutionState.FINISHED.ordinal()] = 2;
310+
} else if (jobStatus == JobStatus.CANCELLING) {
311+
countPerState[ExecutionState.CANCELING.ordinal()] = 7;
312+
}
313+
int numTasks = Arrays.stream(countPerState).sum();
314+
return new JobDetails(
315+
jobID,
316+
"test-job",
317+
System.currentTimeMillis(),
318+
-1,
319+
0,
320+
jobStatus,
321+
System.currentTimeMillis(),
322+
countPerState,
323+
numTasks);
324+
}
298325
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.autoscaler.utils;
19+
20+
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.api.common.JobStatus;
22+
import org.apache.flink.runtime.client.JobStatusMessage;
23+
import org.apache.flink.runtime.execution.ExecutionState;
24+
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
25+
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.List;
31+
import java.util.stream.Collectors;
32+
33+
/** Job status related utilities. */
34+
public class JobStatusUtils {
35+
36+
private static final Logger LOG = LoggerFactory.getLogger(JobStatusUtils.class);
37+
38+
public static List<JobStatusMessage> toJobStatusMessage(
39+
MultipleJobsDetails multipleJobsDetails) {
40+
return multipleJobsDetails.getJobs().stream()
41+
.map(
42+
details ->
43+
new JobStatusMessage(
44+
details.getJobId(),
45+
details.getJobName(),
46+
getEffectiveStatus(details),
47+
details.getStartTime()))
48+
.collect(Collectors.toList());
49+
}
50+
51+
@VisibleForTesting
52+
static JobStatus getEffectiveStatus(JobDetails details) {
53+
int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
54+
int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
55+
boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
56+
JobStatus effectiveStatus = details.getStatus();
57+
if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
58+
LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, JobStatus.CREATED);
59+
return JobStatus.CREATED;
60+
}
61+
return effectiveStatus;
62+
}
63+
}

0 commit comments

Comments
 (0)