diff --git a/docs/content/docs/custom-resource/autoscaler.md b/docs/content/docs/custom-resource/autoscaler.md index 0b3ee75723..b94f5aa6e5 100644 --- a/docs/content/docs/custom-resource/autoscaler.md +++ b/docs/content/docs/custom-resource/autoscaler.md @@ -275,6 +275,14 @@ org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \ Updating the `autoscaler.standalone.fetcher.flink-cluster.host` and `autoscaler.standalone.fetcher.flink-cluster.port` based on your flink cluster. In general, the host and port are the same as Flink WebUI. +To select the job fetcher use: + +``` +--autoscaler.standalone.fetcher.type FLINK_CLUSTER|YARN +``` + +When running against Flink-on-YARN (`YARN`), set the host/port to the YARN web proxy endpoint that exposes the JobManager REST API. + All autoscaler related options can be set at autoscaler standalone level, and the configuration at job-level can override the default value provided in the autoscaler standalone, such as: diff --git a/flink-autoscaler-standalone/README.md b/flink-autoscaler-standalone/README.md index dd835ae1d6..4d309b081b 100644 --- a/flink-autoscaler-standalone/README.md +++ b/flink-autoscaler-standalone/README.md @@ -69,7 +69,9 @@ Please click [here](../flink-autoscaler/README.md) to check out extensibility of `JobAutoScalerContext` of the job. It has a control loop that periodically calls `JobListFetcher#fetch` to fetch the job list and scale these jobs. -Currently `FlinkClusterJobListFetcher` is the only implementation of the `JobListFetcher` -interface, that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far. -We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call -`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically. +Currently `FlinkClusterJobListFetcher` and `YarnJobListFetcher` are implementations of the +`JobListFetcher` interface. that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far. +`YarnJobListFetcher` enables fetching jobs and per-job configuration from +Flink-on-YARN clusters using a provided `RestClusterClient` + +Select which one to use via `autoscaler.standalone.fetcher.type` (`FLINK_CLUSTER` or `YARN`). diff --git a/flink-autoscaler-standalone/pom.xml b/flink-autoscaler-standalone/pom.xml index b270863253..b26af526c3 100644 --- a/flink-autoscaler-standalone/pom.xml +++ b/flink-autoscaler-standalone/pom.xml @@ -101,6 +101,12 @@ under the License. + + org.apache.flink + flink-yarn + ${flink.version} + + diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java index c249727025..526136038a 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java @@ -27,13 +27,16 @@ import org.apache.flink.autoscaler.ScalingExecutor; import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.autoscaler.event.AutoScalerEventHandler; +import org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions; import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher; import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer; +import org.apache.flink.autoscaler.standalone.yarn.YarnJobListFetcher; import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.util.function.FunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,15 +76,24 @@ JobListFetcher createJobListFetcher(Configuration conf) { var port = conf.get(FETCHER_FLINK_CLUSTER_PORT); var restServerAddress = String.format("http://%s:%s", host, port); - return (JobListFetcher) - new FlinkClusterJobListFetcher( - configuration -> - new RestClusterClient<>( - configuration, - "clusterId", - (c, e) -> - new StandaloneClientHAServices(restServerAddress)), - conf.get(FLINK_CLIENT_TIMEOUT)); + var fetcherType = conf.get(AutoscalerStandaloneOptions.FETCHER_TYPE); + FunctionWithException, Exception> clientSupplier = + configuration -> + new RestClusterClient<>( + configuration, + "clusterId", + (c, e) -> new StandaloneClientHAServices(restServerAddress)); + + switch (fetcherType) { + case YARN: + return (JobListFetcher) + new YarnJobListFetcher(clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT)); + case FLINK_CLUSTER: + default: + return (JobListFetcher) + new FlinkClusterJobListFetcher( + clientSupplier, conf.get(FLINK_CLIENT_TIMEOUT)); + } } private static > diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java index 15b9b6a022..3a2dd42d3b 100644 --- a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/config/AutoscalerStandaloneOptions.java @@ -49,6 +49,18 @@ private static ConfigOptions.OptionBuilder autoscalerStandaloneConfig(String key .defaultValue(100) .withDescription("The parallelism of autoscaler standalone control loop."); + public enum FetcherType { + FLINK_CLUSTER, + YARN + } + + public static final ConfigOption FETCHER_TYPE = + autoscalerStandaloneConfig("fetcher.type") + .enumType(FetcherType.class) + .defaultValue(FetcherType.FLINK_CLUSTER) + .withDescription( + "The job list fetcher type to use. Supported values: FLINK_CLUSTER, YARN."); + public static final ConfigOption FETCHER_FLINK_CLUSTER_HOST = autoscalerStandaloneConfig("fetcher.flink-cluster.host") .stringType() diff --git a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java new file mode 100644 index 0000000000..9d729598ba --- /dev/null +++ b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcher.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.standalone.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.JobAutoScalerContext; +import org.apache.flink.autoscaler.standalone.JobListFetcher; +import org.apache.flink.autoscaler.utils.JobStatusUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; +import org.apache.flink.util.function.FunctionWithException; + +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_HOST; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_FLINK_CLUSTER_PORT; + +/** Fetch JobAutoScalerContext based on Flink on YARN cluster. */ +public class YarnJobListFetcher implements JobListFetcher> { + + private final FunctionWithException, Exception> + restClientGetter; + private final Duration restClientTimeout; + + public YarnJobListFetcher( + FunctionWithException, Exception> + restClientGetter, + Duration restClientTimeout) { + this.restClientGetter = restClientGetter; + this.restClientTimeout = restClientTimeout; + } + + @Override + public Collection> fetch(Configuration baseConf) throws Exception { + + List> discovered = tryFetchFromFirstRunningYarnApp(baseConf); + if (!discovered.isEmpty()) { + return discovered; + } + + // use supplied client factory (may point to direct JM or a reverse proxy) + try (var restClusterClient = restClientGetter.apply(new Configuration())) { + return restClusterClient + .sendRequest( + JobsOverviewHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()) + .thenApply(JobStatusUtils::toJobStatusMessage) + .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS) + .stream() + .map( + jobStatusMessage -> { + try { + return generateJobContext( + baseConf, restClusterClient, jobStatusMessage); + } catch (Throwable e) { + throw new RuntimeException( + "generateJobContext throw exception", e); + } + }) + .collect(Collectors.toList()); + } + } + + private List> tryFetchFromFirstRunningYarnApp( + Configuration baseConf) { + List> contexts = new ArrayList<>(); + YarnClient yarnClient = null; + try { + yarnClient = YarnClient.createYarnClient(); + org.apache.hadoop.conf.Configuration yarnConf = + new org.apache.hadoop.conf.Configuration(); + yarnClient.init(yarnConf); + yarnClient.start(); + + Set appTypes = new HashSet<>(); + appTypes.add("Apache Flink"); + List apps = yarnClient.getApplications(appTypes); + + String rmBase = + String.format( + "http://%s:%s", + baseConf.get(FETCHER_FLINK_CLUSTER_HOST), + baseConf.get(FETCHER_FLINK_CLUSTER_PORT)); + + for (ApplicationReport app : apps) { + if (app.getYarnApplicationState() != YarnApplicationState.RUNNING) { + continue; + } + String appId = app.getApplicationId().toString(); + String proxyBase = rmBase + "/proxy/" + appId; + + try (var client = + new RestClusterClient<>( + new Configuration(), + "clusterId", + (c, e) -> new StandaloneClientHAServices(proxyBase))) { + var fetched = + client + .sendRequest( + JobsOverviewHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()) + .thenApply(JobStatusUtils::toJobStatusMessage) + .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS) + .stream() + .map( + jobStatusMessage -> { + try { + return generateJobContextForEndpoint( + baseConf, proxyBase, jobStatusMessage); + } catch (Throwable e) { + throw new RuntimeException( + "generateJobContext throw exception", + e); + } + }) + .collect(Collectors.toList()); + contexts.addAll(fetched); + } + break; + } + } catch (Throwable ignore) { + // Ignore + } finally { + if (yarnClient != null) { + try { + yarnClient.stop(); + } catch (Throwable ignored) { + } + } + } + return contexts; + } + + private JobAutoScalerContext generateJobContext( + Configuration baseConf, + RestClusterClient restClusterClient, + JobStatusMessage jobStatusMessage) + throws Exception { + var jobId = jobStatusMessage.getJobId(); + var conf = getConfiguration(baseConf, restClusterClient, jobId); + + return new JobAutoScalerContext<>( + jobId, + jobId, + jobStatusMessage.getJobState(), + conf, + new UnregisteredMetricsGroup(), + () -> restClientGetter.apply(conf)); + } + + private Configuration getConfiguration( + Configuration baseConf, RestClusterClient restClusterClient, JobID jobId) + throws Exception { + var jobParameters = new JobMessageParameters(); + jobParameters.jobPathParameter.resolve(jobId); + + var configurationInfo = + restClusterClient + .sendRequest( + JobManagerJobConfigurationHeaders.getInstance(), + jobParameters, + EmptyRequestBody.getInstance()) + .get(restClientTimeout.toSeconds(), TimeUnit.SECONDS); + + var conf = new Configuration(baseConf); + configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue())); + return conf; + } + + private JobAutoScalerContext generateJobContextForEndpoint( + Configuration baseConf, String endpointBase, JobStatusMessage jobStatusMessage) + throws Exception { + var jobId = jobStatusMessage.getJobId(); + try (var client = + new RestClusterClient<>( + new Configuration(), + "clusterId", + (c, e) -> new StandaloneClientHAServices(endpointBase))) { + var conf = getConfiguration(baseConf, client, jobId); + return new JobAutoScalerContext<>( + jobId, + jobId, + jobStatusMessage.getJobState(), + conf, + new UnregisteredMetricsGroup(), + () -> + new RestClusterClient<>( + conf, + "clusterId", + (c, e) -> new StandaloneClientHAServices(endpointBase))); + } + } +} diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypointTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypointTest.java index f01efd99fa..71ecbe1ab3 100644 --- a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypointTest.java +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypointTest.java @@ -17,6 +17,9 @@ package org.apache.flink.autoscaler.standalone; +import org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions; +import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher; +import org.apache.flink.autoscaler.standalone.yarn.YarnJobListFetcher; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -25,8 +28,10 @@ import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL; import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM; +import static org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.FETCHER_TYPE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class StandaloneAutoscalerEntrypointTest { @Test @@ -36,17 +41,39 @@ public void testLoadConfiguration() { assertNotNull(conf); assertEquals(Duration.ofMinutes(1), conf.get(CONTROL_LOOP_INTERVAL)); assertEquals(20, conf.get(CONTROL_LOOP_PARALLELISM)); + assertEquals(AutoscalerStandaloneOptions.FetcherType.FLINK_CLUSTER, conf.get(FETCHER_TYPE)); // Test for args override String[] args = new String[] { "--autoscaler.standalone.control-loop.interval", "2min", "--autoscaler" + ".standalone.control-loop.parallelism", - "10" + "10", + "--autoscaler.standalone.fetcher.type", + "YARN" }; Configuration confOverride = StandaloneAutoscalerEntrypoint.loadConfiguration(args); assertNotNull(confOverride); assertEquals(Duration.ofMinutes(2), confOverride.get(CONTROL_LOOP_INTERVAL)); assertEquals(10, confOverride.get(CONTROL_LOOP_PARALLELISM)); + assertEquals(AutoscalerStandaloneOptions.FetcherType.YARN, confOverride.get(FETCHER_TYPE)); + } + + @Test + public void testFetcherTypeSelection() throws Exception { + // Default should select FlinkClusterJobListFetcher + Configuration confDefault = new Configuration(); + var method = + StandaloneAutoscalerEntrypoint.class.getDeclaredMethod( + "createJobListFetcher", Configuration.class); + method.setAccessible(true); + Object fetcherDefault = method.invoke(null, confDefault); + assertTrue(fetcherDefault instanceof FlinkClusterJobListFetcher); + + // YARN should select YarnJobListFetcher + Configuration confYarn = new Configuration(); + confYarn.set(FETCHER_TYPE, AutoscalerStandaloneOptions.FetcherType.YARN); + Object fetcherYarn = method.invoke(null, confYarn); + assertTrue(fetcherYarn instanceof YarnJobListFetcher); } } diff --git a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcherTest.java b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcherTest.java new file mode 100644 index 0000000000..14e36314a5 --- /dev/null +++ b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/yarn/YarnJobListFetcherTest.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.standalone.yarn; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.rest.messages.ConfigurationInfo; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; +import org.apache.flink.types.Either; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.fail; + +/** Test for {@link YarnJobListFetcher}. */ +class YarnJobListFetcherTest { + + /** Test whether the job list and confs are expected. */ + @Test + void testFetchJobListAndConfigurationInfo() throws Exception { + var job1 = new JobID(); + var job2 = new JobID(); + var jobs = Map.of(job1, JobStatus.RUNNING, job2, JobStatus.CANCELLING); + + Configuration expectedConf1 = new Configuration(); + expectedConf1.setString("option_key1", "option_value1"); + + Configuration expectedConf2 = new Configuration(); + expectedConf2.setString("option_key2", "option_value2"); + expectedConf2.setString("option_key3", "option_value3"); + + var configurations = Map.of(job1, expectedConf1, job2, expectedConf2); + var closeCounter = new AtomicLong(); + YarnJobListFetcher jobListFetcher = + new YarnJobListFetcher( + getRestClusterClient( + Either.Left(jobs), + Either.Left( + Map.of( + job1, + ConfigurationInfo.from(expectedConf1), + job2, + ConfigurationInfo.from(expectedConf2))), + closeCounter), + Duration.ofSeconds(10)); + + // Test for empty base conf + assertFetcherResult( + new Configuration(), jobs, configurations, closeCounter, jobListFetcher); + + // Add the new option key, and old option key with different value. + var baseConf = new Configuration(); + baseConf.setString("option_key4", "option_value4"); + baseConf.setString("option_key3", "option_value5"); + + closeCounter.set(0); + // Test for mixed base conf + assertFetcherResult( + new Configuration(), jobs, configurations, closeCounter, jobListFetcher); + } + + private void assertFetcherResult( + Configuration baseConf, + Map jobs, + Map configurations, + AtomicLong closeCounter, + YarnJobListFetcher jobListFetcher) + throws Exception { + // Fetch multiple times and check whether the results are as expected each time + for (int i = 1; i <= 3; i++) { + var fetchedJobList = jobListFetcher.fetch(baseConf); + // Check whether rest client is closed. + assertThat(closeCounter).hasValue(i); + + assertThat(fetchedJobList).hasSize(2); + for (var jobContext : fetchedJobList) { + var expectedJobStatus = jobs.get(jobContext.getJobID()); + + var expectedConf = new Configuration(baseConf); + expectedConf.addAll(configurations.get(jobContext.getJobID())); + assertThat(jobContext.getJobStatus()).isNotNull().isEqualTo(expectedJobStatus); + + assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf); + } + } + } + + /** + * Test whether the exception is expected after rest client fetches job list throws exception, + * and restClient can be closed normally. + */ + @Test + void testFetchJobListException() { + var expectedException = new RuntimeException("Expected exception."); + var closeCounter = new AtomicLong(); + + YarnJobListFetcher jobListFetcher = + new YarnJobListFetcher( + getRestClusterClient( + Either.Right(expectedException), + Either.Left(Map.of()), + closeCounter), + Duration.ofSeconds(10)); + assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration())) + .getCause() + .isEqualTo(expectedException); + assertThat(closeCounter).hasValue(1); + } + + /** + * Test whether the exception is expected after rest client fetches conf throws exception, and + * restClient can be closed normally. + */ + @Test + void testFetchConfigurationException() { + var expectedException = new RuntimeException("Expected exception."); + var closeCounter = new AtomicLong(); + + YarnJobListFetcher jobListFetcher = + new YarnJobListFetcher( + getRestClusterClient( + Either.Left(Map.of(new JobID(), JobStatus.RUNNING)), + Either.Right(expectedException), + closeCounter), + Duration.ofSeconds(10)); + + assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration())) + .getRootCause() + .isEqualTo(expectedException); + assertThat(closeCounter).hasValue(1); + } + + /** + * Test whether the exception is expected after rest client fetches job list timeout, and + * restClient can be closed normally. + */ + @Test + void testFetchJobListTimeout() { + CompletableFuture closeFuture = new CompletableFuture<>(); + YarnJobListFetcher jobListFetcher = + new YarnJobListFetcher( + getTimeoutableRestClusterClient(null, null, closeFuture), + Duration.ofSeconds(2)); + + assertThat(closeFuture).isNotDone(); + assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration())) + .isInstanceOf(TimeoutException.class); + assertThat(closeFuture).isDone(); + } + + /** + * Test whether the exception is expected after rest client fetches conf timeout, and restClient + * can be closed normally. + */ + @Test + void testFetchConfigurationTimeout() { + CompletableFuture closeFuture = new CompletableFuture<>(); + + YarnJobListFetcher jobListFetcher = + new YarnJobListFetcher( + getTimeoutableRestClusterClient( + Map.of(new JobID(), JobStatus.RUNNING), null, closeFuture), + Duration.ofSeconds(2)); + + assertThat(closeFuture).isNotDone(); + assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration())) + .getRootCause() + .isInstanceOf(TimeoutException.class); + assertThat(closeFuture).isDone(); + } + + /** + * @param jobsOrException When the jobs overview is called, return jobList if Either is left, + * return failedFuture if Either is right. + * @param configurationsOrException When fetch job conf, return configuration if Either is left, + * return failedFuture if Either is right. + * @param closeCounter Increment the count each time the {@link RestClusterClient#close} is + * called + */ + private static FunctionWithException, Exception> + getRestClusterClient( + Either, Throwable> jobsOrException, + Either, Throwable> configurationsOrException, + AtomicLong closeCounter) { + return conf -> + new RestClusterClient<>( + conf, + "test-cluster", + (c, e) -> new StandaloneClientHAServices("localhost")) { + + @Override + public < + M extends MessageHeaders, + U extends MessageParameters, + R extends RequestBody, + P extends ResponseBody> + CompletableFuture

sendRequest(M h, U p, R r) { + if (h instanceof JobManagerJobConfigurationHeaders) { + if (configurationsOrException.isRight()) { + return CompletableFuture.failedFuture( + configurationsOrException.right()); + } + var jobID = ((JobMessageParameters) p).jobPathParameter.getValue(); + return (CompletableFuture

) + CompletableFuture.completedFuture( + configurationsOrException.left().get(jobID)); + } else if (h instanceof JobsOverviewHeaders) { + if (jobsOrException.isLeft()) { + return (CompletableFuture

) + CompletableFuture.completedFuture( + new MultipleJobsDetails( + jobsOrException.left().entrySet().stream() + .map( + entry -> + generateJobDetails( + entry + .getKey(), + entry + .getValue())) + .collect(Collectors.toList()))); + } + return CompletableFuture.failedFuture(jobsOrException.right()); + } + fail("Unknown request"); + return null; + } + + @Override + public void close() { + super.close(); + closeCounter.incrementAndGet(); + } + }; + } + + /** + * @param jobs When the jobs overview is called, return jobList if it's not null, don't complete + * future if it's null. + * @param configuration When fetch job conf, return configuration if it's not null, don't + * complete future if it's null. + * @param closeFuture Complete this closeFuture when {@link RestClusterClient#close} is called. + */ + private static FunctionWithException, Exception> + getTimeoutableRestClusterClient( + @Nullable Map jobs, + @Nullable ConfigurationInfo configuration, + CompletableFuture closeFuture) { + return conf -> + new RestClusterClient<>( + conf, + "test-cluster", + (c, e) -> new StandaloneClientHAServices("localhost")) { + + @Override + public < + M extends MessageHeaders, + U extends MessageParameters, + R extends RequestBody, + P extends ResponseBody> + CompletableFuture

sendRequest(M h, U p, R r) { + if (h instanceof JobManagerJobConfigurationHeaders) { + if (configuration == null) { + return new CompletableFuture<>(); + } + return (CompletableFuture

) + CompletableFuture.completedFuture(configuration); + } else if (h instanceof JobsOverviewHeaders) { + if (jobs == null) { + return new CompletableFuture<>(); + } + return (CompletableFuture

) + CompletableFuture.completedFuture( + new MultipleJobsDetails( + jobs.entrySet().stream() + .map( + entry -> + generateJobDetails( + entry.getKey(), + entry + .getValue())) + .collect(Collectors.toList()))); + } + fail("Unknown request"); + return null; + } + + @Override + public void close() { + super.close(); + closeFuture.complete(null); + } + }; + } + + private static JobDetails generateJobDetails(JobID jobID, JobStatus jobStatus) { + int[] countPerState = new int[ExecutionState.values().length]; + if (jobStatus == JobStatus.RUNNING) { + countPerState[ExecutionState.RUNNING.ordinal()] = 5; + countPerState[ExecutionState.FINISHED.ordinal()] = 2; + } else if (jobStatus == JobStatus.CANCELLING) { + countPerState[ExecutionState.CANCELING.ordinal()] = 7; + } + int numTasks = Arrays.stream(countPerState).sum(); + return new JobDetails( + jobID, + "test-job", + System.currentTimeMillis(), + -1, + 0, + jobStatus, + System.currentTimeMillis(), + countPerState, + numTasks); + } +}