Skip to content

Commit 249a694

Browse files
committed
[FLINK-35105][autoscaler] Support setting default Autoscaler options at autoscaler standalone level
1 parent 9db83bc commit 249a694

File tree

7 files changed

+78
-20
lines changed

7 files changed

+78
-20
lines changed

docs/content.zh/docs/custom-resource/autoscaler.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,13 @@ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
275275
Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and `autoscaler.standalone.fetcher.flink-cluster.port`
276276
based on your flink cluster. In general, the host and port are the same as Flink WebUI.
277277
278+
All autoscaler related options can be set at autoscaler standalone level, and the configuration at job-level can
279+
override the default value provided in the autoscaler standalone, such as:
280+
281+
- job.autoscaler.enabled
282+
- job.autoscaler.metrics.window
283+
- etc
284+
278285
### Using the JDBC Autoscaler State Store & Event Handler
279286
280287
A driver dependency is required to connect to a specified database. Here are drivers currently supported,

docs/content/docs/custom-resource/autoscaler.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,13 @@ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
275275
Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and `autoscaler.standalone.fetcher.flink-cluster.port`
276276
based on your flink cluster. In general, the host and port are the same as Flink WebUI.
277277
278+
All autoscaler related options can be set at autoscaler standalone level, and the configuration at job-level can
279+
override the default value provided in the autoscaler standalone, such as:
280+
281+
- job.autoscaler.enabled
282+
- job.autoscaler.metrics.window
283+
- etc
284+
278285
### Using the JDBC Autoscaler State Store & Event Handler
279286
280287
A driver dependency is required to connect to a specified database. Here are drivers currently supported,

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/JobListFetcher.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,19 @@
1919

2020
import org.apache.flink.annotation.Experimental;
2121
import org.apache.flink.autoscaler.JobAutoScalerContext;
22+
import org.apache.flink.configuration.Configuration;
2223

2324
import java.util.Collection;
2425

2526
/** The JobListFetcher will fetch the jobContext of all jobs. */
2627
@Experimental
2728
public interface JobListFetcher<KEY, Context extends JobAutoScalerContext<KEY>> {
2829

29-
Collection<Context> fetch() throws Exception;
30+
/**
31+
* Fetch the job context.
32+
*
33+
* @param baseConf The basic configuration for standalone autoscaler. The basic configuration
34+
* can be overridden by the configuration at job-level.
35+
*/
36+
Collection<Context> fetch(Configuration baseConf) throws Exception;
3037
}

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.autoscaler.JobAutoScalerContext;
2323
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
2424
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.configuration.UnmodifiableConfiguration;
2526
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
2627

2728
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -60,6 +61,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
6061
private final JobAutoScaler<KEY, Context> autoScaler;
6162
private final ScheduledExecutorService scheduledExecutorService;
6263
private final ExecutorService scalingThreadPool;
64+
private final UnmodifiableConfiguration baseConf;
6365

6466
/**
6567
* Maintain a set of job keys that during scaling, it should be updated at {@link
@@ -88,6 +90,7 @@ public StandaloneAutoscalerExecutor(
8890
Executors.newFixedThreadPool(
8991
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
9092
this.scalingJobKeys = new HashSet<>();
93+
this.baseConf = new UnmodifiableConfiguration(conf);
9194
}
9295

9396
public void start() {
@@ -107,7 +110,7 @@ protected void scaling() {
107110
LOG.info("Standalone autoscaler starts scaling.");
108111
Collection<Context> jobList;
109112
try {
110-
jobList = jobListFetcher.fetch();
113+
jobList = jobListFetcher.fetch(baseConf);
111114
} catch (Throwable e) {
112115
LOG.error("Error while fetch job list.", e);
113116
return;

flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcher.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public FlinkClusterJobListFetcher(
5454
}
5555

5656
@Override
57-
public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
57+
public Collection<JobAutoScalerContext<JobID>> fetch(Configuration baseConf) throws Exception {
5858
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
5959
return restClusterClient
6060
.sendRequest(
@@ -67,7 +67,8 @@ public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
6767
.map(
6868
jobStatusMessage -> {
6969
try {
70-
return generateJobContext(restClusterClient, jobStatusMessage);
70+
return generateJobContext(
71+
baseConf, restClusterClient, jobStatusMessage);
7172
} catch (Throwable e) {
7273
throw new RuntimeException(
7374
"generateJobContext throw exception", e);
@@ -78,10 +79,12 @@ public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
7879
}
7980

8081
private JobAutoScalerContext<JobID> generateJobContext(
81-
RestClusterClient<String> restClusterClient, JobStatusMessage jobStatusMessage)
82+
Configuration baseConf,
83+
RestClusterClient<String> restClusterClient,
84+
JobStatusMessage jobStatusMessage)
8285
throws Exception {
8386
var jobId = jobStatusMessage.getJobId();
84-
var conf = getConfiguration(restClusterClient, jobId);
87+
var conf = getConfiguration(baseConf, restClusterClient, jobId);
8588

8689
return new JobAutoScalerContext<>(
8790
jobId,
@@ -92,7 +95,8 @@ private JobAutoScalerContext<JobID> generateJobContext(
9295
() -> restClientGetter.apply(conf));
9396
}
9497

95-
private Configuration getConfiguration(RestClusterClient<String> restClusterClient, JobID jobId)
98+
private Configuration getConfiguration(
99+
Configuration baseConf, RestClusterClient<String> restClusterClient, JobID jobId)
96100
throws Exception {
97101
var jobParameters = new JobMessageParameters();
98102
jobParameters.jobPathParameter.resolve(jobId);
@@ -105,7 +109,7 @@ private Configuration getConfiguration(RestClusterClient<String> restClusterClie
105109
EmptyRequestBody.getInstance())
106110
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
107111

108-
var conf = new Configuration();
112+
var conf = new Configuration(baseConf);
109113
configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
110114
return conf;
111115
}

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutorTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void cleanup(JobID jobKey) {
8484

8585
try (var autoscalerExecutor =
8686
new StandaloneAutoscalerExecutor<>(
87-
conf, () -> jobList, eventCollector, jobAutoScaler) {
87+
conf, baseConf -> jobList, eventCollector, jobAutoScaler) {
8888
@Override
8989
protected void scalingSingleJob(JobAutoScalerContext<JobID> jobContext) {
9090
super.scalingSingleJob(jobContext);
@@ -112,7 +112,7 @@ void testFetchException() {
112112
try (var autoscalerExecutor =
113113
new StandaloneAutoscalerExecutor<>(
114114
new Configuration(),
115-
() -> {
115+
baseConf -> {
116116
throw new RuntimeException("Excepted exception.");
117117
},
118118
eventCollector,
@@ -149,7 +149,7 @@ void testScalingParallelism() {
149149
try (var autoscalerExecutor =
150150
new StandaloneAutoscalerExecutor<>(
151151
conf,
152-
() -> jobList,
152+
baseConf -> jobList,
153153
new TestingEventCollector<>(),
154154
new JobAutoScaler<>() {
155155
@Override
@@ -197,7 +197,7 @@ void testOneJobScalingSlow() throws Exception {
197197
try (var autoscalerExecutor =
198198
new StandaloneAutoscalerExecutor<>(
199199
conf,
200-
jobContextWithIndex::keySet,
200+
baseConf -> jobContextWithIndex.keySet(),
201201
new TestingEventCollector<>(),
202202
new JobAutoScaler<>() {
203203
@Override

flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/flinkcluster/FlinkClusterJobListFetcherTest.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,42 @@ void testFetchJobListAndConfigurationInfo() throws Exception {
8484
closeCounter),
8585
Duration.ofSeconds(10));
8686

87+
// Test for empty base conf
88+
assertFetcherResult(
89+
new Configuration(), jobs, configurations, closeCounter, jobListFetcher);
90+
91+
// Add the new option key, and old option key with different value.
92+
var baseConf = new Configuration();
93+
baseConf.setString("option_key4", "option_value4");
94+
baseConf.setString("option_key3", "option_value5");
95+
96+
closeCounter.set(0);
97+
// Test for mixed base conf
98+
assertFetcherResult(
99+
new Configuration(), jobs, configurations, closeCounter, jobListFetcher);
100+
}
101+
102+
private void assertFetcherResult(
103+
Configuration baseConf,
104+
Map<JobID, JobStatus> jobs,
105+
Map<JobID, Configuration> configurations,
106+
AtomicLong closeCounter,
107+
FlinkClusterJobListFetcher jobListFetcher)
108+
throws Exception {
87109
// Fetch multiple times and check whether the results are as expected each time
88110
for (int i = 1; i <= 3; i++) {
89-
var fetchedJobList = jobListFetcher.fetch();
111+
var fetchedJobList = jobListFetcher.fetch(baseConf);
90112
// Check whether rest client is closed.
91113
assertThat(closeCounter).hasValue(i);
92114

93115
assertThat(fetchedJobList).hasSize(2);
94116
for (var jobContext : fetchedJobList) {
95-
var expectedJobState = jobs.get(jobContext.getJobID());
96-
Configuration expectedConf = configurations.get(jobContext.getJobID());
97-
assertThat(jobContext.getJobStatus()).isEqualTo(expectedJobState);
117+
var expectedJobStatus = jobs.get(jobContext.getJobID());
118+
119+
var expectedConf = new Configuration(baseConf);
120+
expectedConf.addAll(configurations.get(jobContext.getJobID()));
121+
assertThat(jobContext.getJobStatus()).isNotNull().isEqualTo(expectedJobStatus);
122+
98123
assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
99124
}
100125
}
@@ -116,7 +141,9 @@ void testFetchJobListException() {
116141
Either.Left(Map.of()),
117142
closeCounter),
118143
Duration.ofSeconds(10));
119-
assertThatThrownBy(jobListFetcher::fetch).getCause().isEqualTo(expectedException);
144+
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
145+
.getCause()
146+
.isEqualTo(expectedException);
120147
assertThat(closeCounter).hasValue(1);
121148
}
122149

@@ -137,7 +164,9 @@ void testFetchConfigurationException() {
137164
closeCounter),
138165
Duration.ofSeconds(10));
139166

140-
assertThatThrownBy(jobListFetcher::fetch).getRootCause().isEqualTo(expectedException);
167+
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
168+
.getRootCause()
169+
.isEqualTo(expectedException);
141170
assertThat(closeCounter).hasValue(1);
142171
}
143172

@@ -154,7 +183,8 @@ void testFetchJobListTimeout() {
154183
Duration.ofSeconds(2));
155184

156185
assertThat(closeFuture).isNotDone();
157-
assertThatThrownBy(jobListFetcher::fetch).isInstanceOf(TimeoutException.class);
186+
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
187+
.isInstanceOf(TimeoutException.class);
158188
assertThat(closeFuture).isDone();
159189
}
160190

@@ -173,7 +203,7 @@ void testFetchConfigurationTimeout() {
173203
Duration.ofSeconds(2));
174204

175205
assertThat(closeFuture).isNotDone();
176-
assertThatThrownBy(jobListFetcher::fetch)
206+
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
177207
.getRootCause()
178208
.isInstanceOf(TimeoutException.class);
179209
assertThat(closeFuture).isDone();

0 commit comments

Comments
 (0)