Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content/docs/custom-resource/autoscaler.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and `autoscaler.standalone.fetcher.flink-cluster.port`
based on your flink cluster. In general, the host and port are the same as Flink WebUI.
To select the job fetcher use:
```
--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about introducing a whole demo for yarn mode?

```
When running against Flink-on-YARN (`YARN`), set the host/port to the YARN web proxy endpoint that exposes the JobManager REST API.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set the host/port to the YARN web proxy endpoint

Do you mean autoscaler.standalone.fetcher.flink-cluster.host and autoscaler.standalone.fetcher.flink-cluster.port?

If yes, it does not make sense. Because all config options with autoscaler.standalone.fetcher.flink-cluster prefix are related to flink-cluster. It is better to introduce yarn cluster related config options.

All autoscaler related options can be set at autoscaler standalone level, and the configuration at job-level can
override the default value provided in the autoscaler standalone, such as:
Expand Down
10 changes: 6 additions & 4 deletions flink-autoscaler-standalone/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ Please click [here](../flink-autoscaler/README.md) to check out extensibility of
`JobAutoScalerContext` of the job. It has a control loop that periodically calls
`JobListFetcher#fetch` to fetch the job list and scale these jobs.

Currently `FlinkClusterJobListFetcher` is the only implementation of the `JobListFetcher`
interface, that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call
`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
Currently `FlinkClusterJobListFetcher` and `YarnJobListFetcher` are implementations of the
`JobListFetcher` interface. that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why Flink Autoscaler Standalone only supports a single Flink cluster so far.

It no longer makes sense after adding YARN support, and the sentence should either be removed or rewritten to explain that each fetcher instance still monitors a single cluster or YARN deployment.

`YarnJobListFetcher` enables fetching jobs and per-job configuration from
Flink-on-YARN clusters using a provided `RestClusterClient`

Select which one to use via `autoscaler.standalone.fetcher.type` (`FLINK_CLUSTER` or `YARN`).
6 changes: 6 additions & 0 deletions flink-autoscaler-standalone/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ under the License.
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn</artifactId>
<version>${flink.version}</version>
</dependency>
Comment on lines +104 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to minimize the scope of dependencies? For example, only yarn-client is added here.

Also, is it needed to exclude some dependencies to avoid dependency conflicts?


<!-- Logging -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.apache.flink.autoscaler.ScalingExecutor;
import org.apache.flink.autoscaler.ScalingMetricEvaluator;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions;
import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
import org.apache.flink.autoscaler.standalone.yarn.YarnJobListFetcher;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.util.function.FunctionWithException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,15 +76,24 @@ JobListFetcher<KEY, Context> createJobListFetcher(Configuration conf) {
var port = conf.get(FETCHER_FLINK_CLUSTER_PORT);
var restServerAddress = String.format("http://%s:%s", host, port);

return (JobListFetcher<KEY, Context>)
new FlinkClusterJobListFetcher(
configuration ->
new RestClusterClient<>(
configuration,
"clusterId",
(c, e) ->
new StandaloneClientHAServices(restServerAddress)),
conf.get(FLINK_CLIENT_TIMEOUT));
var fetcherType = conf.get(AutoscalerStandaloneOptions.FETCHER_TYPE);
FunctionWithException<Configuration, RestClusterClient<String>, Exception> clientSupplier =
configuration ->
new RestClusterClient<>(
configuration,
"clusterId",
(c, e) -> new StandaloneClientHAServices(restServerAddress));

switch (fetcherType) {
case YARN:
return (JobListFetcher<KEY, Context>)
new YarnJobListFetcher(clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT));
case FLINK_CLUSTER:
default:
return (JobListFetcher<KEY, Context>)
new FlinkClusterJobListFetcher(
clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT));
Comment on lines +92 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of AutoscalerStandaloneOptions.FETCHER_TYPE is FLINK_CLUSTER, so including default case that falls back to FLINK_CLUSTER here does not make sense, because it silently accepts invalid configuration values. Throwing an exception for unknown fetcher types is better. It could prevent potential bugs if introducing new type in the future.

}
}

private static <KEY, Context extends JobAutoScalerContext<KEY>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key
.defaultValue(100)
.withDescription("The parallelism of autoscaler standalone control loop.");

public enum FetcherType {
FLINK_CLUSTER,
YARN
}

public static final ConfigOption<FetcherType> FETCHER_TYPE =
autoscalerStandaloneConfig("fetcher.type")
.enumType(FetcherType.class)
.defaultValue(FetcherType.FLINK_CLUSTER)
.withDescription(
"The job list fetcher type to use. Supported values: FLINK_CLUSTER, YARN.");
Comment on lines +57 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/flink-kubernetes-operator/blob/main/docs/README.md

Please generate docs according to this doc. Also, IIRC, it is not needed to mentioned values, and doc tools will list all values by default.


public static final ConfigOption<String> FETCHER_FLINK_CLUSTER_HOST =
autoscalerStandaloneConfig("fetcher.flink-cluster.host")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone.yarn;

import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.standalone.JobListFetcher;
import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.util.function.FunctionWithException;

import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST;
import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT;

/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */
public class YarnJobListFetcher implements JobListFetcher<JobID, JobAutoScalerContext<JobID>> {

private final FunctionWithException<Configuration, RestClusterClient<String>, Exception>
restClientGetter;
private final Duration restClientTimeout;

public YarnJobListFetcher(
FunctionWithException<Configuration, RestClusterClient<String>, Exception>
restClientGetter,
Duration restClientTimeout) {
this.restClientGetter = restClientGetter;
this.restClientTimeout = restClientTimeout;
}

@Override
public Collection<JobAutoScalerContext<JobID>> fetch(Configuration baseConf) throws Exception {

List<JobAutoScalerContext<JobID>> discovered = tryFetchFromFirstRunningYarnApp(baseConf);
if (!discovered.isEmpty()) {
return discovered;
Comment on lines +70 to +72
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I do not understand this part. Why only fetch flink jobs on the first yarn application?

IIUC, if yarn cluster has multiple yarn application, autoscaler should work for all flink jobs on this cluster, is not it?

}

// use supplied client factory (may point to direct JM or a reverse proxy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why fallback to JM or flink cluster here? If this is what the user expects, why choosing yarn cluster fetcher instead of flink cluster fetcher?

try (var restClusterClient = restClientGetter.apply(new Configuration())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using an empty conf here? It discards all baseConf options that might be needed for the REST client setup such as timeouts, etc

return restClusterClient
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.thenApply(JobStatusUtils::toJobStatusMessage)
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
.stream()
.map(
jobStatusMessage -> {
try {
return generateJobContext(
baseConf, restClusterClient, jobStatusMessage);
} catch (Throwable e) {
throw new RuntimeException(
"generateJobContext throw exception", e);
}
})
.collect(Collectors.toList());
}
}

private List<JobAutoScalerContext<JobID>> tryFetchFromFirstRunningYarnApp(
Configuration baseConf) {
List<JobAutoScalerContext<JobID>> contexts = new ArrayList<>();
YarnClient yarnClient = null;
try {
yarnClient = YarnClient.createYarnClient();
org.apache.hadoop.conf.Configuration yarnConf =
new org.apache.hadoop.conf.Configuration();
yarnClient.init(yarnConf);
yarnClient.start();
Comment on lines +104 to +108
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating YarnClient without any Hadoop configuration, I am not sure whether it works. Generally, it needs Hadoop configuration files like core-site.xml or yarn-site.xml that might be present in the classpath.


Set<String> appTypes = new HashSet<>();
appTypes.add("Apache Flink");
List<ApplicationReport> apps = yarnClient.getApplications(appTypes);

String rmBase =
String.format(
"http://%s:%s",
baseConf.get(FETCHER_FLINK_CLUSTER_HOST),
baseConf.get(FETCHER_FLINK_CLUSTER_PORT));

for (ApplicationReport app : apps) {
if (app.getYarnApplicationState() != YarnApplicationState.RUNNING) {
continue;
}
String appId = app.getApplicationId().toString();
String proxyBase = rmBase + "/proxy/" + appId;

try (var client =
new RestClusterClient<>(
new Configuration(),
"clusterId",
(c, e) -> new StandaloneClientHAServices(proxyBase))) {
var fetched =
client
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.thenApply(JobStatusUtils::toJobStatusMessage)
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
.stream()
.map(
jobStatusMessage -> {
try {
return generateJobContextForEndpoint(
baseConf, proxyBase, jobStatusMessage);
} catch (Throwable e) {
throw new RuntimeException(
"generateJobContext throw exception",
e);
}
})
.collect(Collectors.toList());
contexts.addAll(fetched);
}
break;
}
} catch (Throwable ignore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This catch does not provide fault isolation among jobs or yarn applications, if one job is stuck on GC or something else, the autoscaler won't work for all applciations.

// Ignore
Comment on lines +157 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It suppresses all exceptions including critical ones like OutOfMemoryError without any logging, making it impossible to diagnose why YARN-based job discovery failed, such as: do not know if there are configuration issues, network problems, or authentication failures.

} finally {
if (yarnClient != null) {
try {
yarnClient.stop();
} catch (Throwable ignored) {
}
}
}
return contexts;
}

private JobAutoScalerContext<JobID> generateJobContext(
Configuration baseConf,
RestClusterClient<String> restClusterClient,
JobStatusMessage jobStatusMessage)
throws Exception {
var jobId = jobStatusMessage.getJobId();
var conf = getConfiguration(baseConf, restClusterClient, jobId);

return new JobAutoScalerContext<>(
jobId,
jobId,
jobStatusMessage.getJobState(),
conf,
new UnregisteredMetricsGroup(),
() -> restClientGetter.apply(conf));
}

private Configuration getConfiguration(
Configuration baseConf, RestClusterClient<String> restClusterClient, JobID jobId)
throws Exception {
var jobParameters = new JobMessageParameters();
jobParameters.jobPathParameter.resolve(jobId);

var configurationInfo =
restClusterClient
.sendRequest(
JobManagerJobConfigurationHeaders.getInstance(),
jobParameters,
EmptyRequestBody.getInstance())
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);

var conf = new Configuration(baseConf);
configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
return conf;
}

private JobAutoScalerContext<JobID> generateJobContextForEndpoint(
Configuration baseConf, String endpointBase, JobStatusMessage jobStatusMessage)
throws Exception {
var jobId = jobStatusMessage.getJobId();
try (var client =
new RestClusterClient<>(
new Configuration(),
"clusterId",
(c, e) -> new StandaloneClientHAServices(endpointBase))) {
var conf = getConfiguration(baseConf, client, jobId);
return new JobAutoScalerContext<>(
jobId,
jobId,
jobStatusMessage.getJobState(),
conf,
new UnregisteredMetricsGroup(),
() ->
new RestClusterClient<>(
conf,
"clusterId",
(c, e) -> new StandaloneClientHAServices(endpointBase)));
}
}
}
Loading
Loading