diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java index 4b4d61ed6c201..028f0f1b084dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java @@ -343,7 +343,7 @@ protected static RetryPolicy createRetryPolicy(Configuration conf, return RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, - failoverSleepBaseMs, failoverSleepMaxMs); + maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs); } if (rmConnectionRetryIntervalMS < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java index c70b7b5eb5435..6a0b26faf247d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java @@ -18,12 +18,13 @@ package org.apache.hadoop.yarn.server.federation.policies; +import java.io.IOException; import java.util.Map; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -138,15 +139,16 @@ public void setPolicyContext( * @return the map of ids to info for all active subclusters. * * @throws YarnException if we can't get the list. + * @throws IOException if no active subclusters. */ protected Map getActiveSubclusters() - throws YarnException { + throws YarnException, IOException { Map activeSubclusters = getPolicyContext().getFederationStateStoreFacade().getSubClusters(true); if (activeSubclusters == null || activeSubclusters.size() < 1) { - throw new NoActiveSubclustersException( + throw new RetriableException( "Zero active subclusters, cannot pick where to send job."); } return activeSubclusters; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java index c18d3955e6d78..f6c9bb6775bb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -25,10 +26,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -185,12 +186,12 @@ public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue, * @param blackListSubClusters the list of subClusters as identified by * {@link SubClusterId} to blackList from the selection of the home * subCluster. - * @throws FederationPolicyException if there are no usable subclusters. + * @throws IOException if there are no usable subclusters. */ public static void validateSubClusterAvailability( Collection activeSubClusters, Collection blackListSubClusters) - throws FederationPolicyException { + throws IOException { if (activeSubClusters != null && !activeSubClusters.isEmpty()) { if (blackListSubClusters == null) { return; @@ -202,7 +203,7 @@ public static void validateSubClusterAvailability( } } } - throw new FederationPolicyException( + throw new RetriableException( FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 68eceb5a325a6..564379a67549d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.List; @@ -124,10 +125,11 @@ public RouterPolicyFacade(Configuration conf, * * @throws YarnException if there are issues initializing policies, or no * valid sub-cluster id could be found for this app. + * @throws IOException if no active subclusters. */ public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext, - List blackListSubClusters) throws YarnException { + List blackListSubClusters) throws YarnException, IOException { // the maps are concurrent, but we need to protect from reset() // reinitialization mid-execution by creating a new reference local to this @@ -231,9 +233,10 @@ public synchronized void reset() { * * @throws YarnException if there are issues initializing policies, or no * valid sub-cluster id could be found for this reservation. + * @throws IOException if no active subclusters. */ public SubClusterId getReservationHomeSubCluster( - ReservationSubmissionRequest request) throws YarnException { + ReservationSubmissionRequest request) throws YarnException, IOException { // the maps are concurrent, but we need to protect from reset() // reinitialization mid-execution by creating a new reference local to this diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java index 36074f989fd1a..5b76f060573b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,7 +52,7 @@ public void reinitialize( @Override public Map> splitResourceRequests( List resourceRequests, - Set timedOutSubClusters) throws YarnException { + Set timedOutSubClusters) throws YarnException, IOException { Map activeSubclusters = getActiveSubclusters(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java index 3d39d7280d4e3..497df7ac5151c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,10 +47,11 @@ public interface FederationAMRMProxyPolicy * list of {@link ResourceRequest}s that should be forwarded to it * @throws YarnException in case the request is malformed or no viable * sub-clusters can be found. + * @throws IOException if no active subclusters. */ Map> splitResourceRequests( List resourceRequests, - Set timedOutSubClusters) throws YarnException; + Set timedOutSubClusters) throws YarnException, IOException; /** * This method should be invoked to notify the policy about responses being diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java index acb7e0af1834f..cee07b84842e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/HomeAMRMProxyPolicy.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; @@ -57,14 +59,14 @@ public void reinitialize( @Override public Map> splitResourceRequests( List resourceRequests, - Set timedOutSubClusters) throws YarnException { + Set timedOutSubClusters) throws YarnException, IOException { if (homeSubcluster == null) { throw new FederationPolicyException("No home subcluster available"); } Map active = getActiveSubclusters(); if (!active.containsKey(homeSubcluster)) { - throw new FederationPolicyException( + throw new RetriableException( "The local subcluster " + homeSubcluster + " is not active"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index df3e222cc5dda..d3e10685f754c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.amrmproxy; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -34,6 +35,7 @@ import org.apache.commons.collections4.MapUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; @@ -46,7 +48,6 @@ import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -267,7 +268,7 @@ public void notifyOfResponse(SubClusterId subClusterId, @Override public Map> splitResourceRequests( List resourceRequests, - Set timedOutSubClusters) throws YarnException { + Set timedOutSubClusters) throws YarnException, IOException { // object used to accumulate statistics about the answer, initialize with // active subclusters. Create a new instance per call because this method @@ -712,7 +713,8 @@ protected final class AllocationBookkeeper { private void reinitialize( Map activeSubclusters, - Set timedOutSubClusters, Configuration pConf) throws YarnException { + Set timedOutSubClusters, Configuration pConf) + throws YarnException, IOException { if (MapUtils.isEmpty(activeSubclusters)) { throw new YarnRuntimeException("null activeSubclusters received"); @@ -752,7 +754,7 @@ private void reinitialize( String errorMsg = "None of the subClusters enabled in this Policy (weight > 0) are " + "currently active we cannot forward the ResourceRequest(s)"; if (failOnError) { - throw new NoActiveSubclustersException(errorMsg); + throw new RetriableException(errorMsg); } else { LOG.error(errorMsg + ", continuing by enabling all active subClusters."); activeAndEnabledSC.addAll(activeSubclusters.keySet()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java index fc44433e17eee..496959a96b71e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Collections; @@ -82,9 +83,11 @@ public void validate(ApplicationSubmissionContext appSubmissionContext) * @return the chosen sub-cluster * * @throws YarnException if the policy fails to choose a sub-cluster + * @throws IOException if no active subclusters. */ protected abstract SubClusterId chooseSubCluster(String queue, - Map preSelectSubClusters) throws YarnException; + Map preSelectSubClusters) + throws YarnException, IOException; /** * Filter chosen SubCluster based on reservationId. @@ -130,11 +133,11 @@ protected Map prefilterSubClusters( * @return a hash-based chosen {@link SubClusterId} that will be the "home" * for this application. * - * @throws YarnException if there are no active subclusters. + * @throws IOException if there are no active subclusters. */ @Override public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext, - List blackLists) throws YarnException { + List blackLists) throws YarnException, IOException { // null checks and default-queue behavior validate(appContext); @@ -169,7 +172,7 @@ public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext, */ @Override public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request) - throws YarnException { + throws YarnException, IOException { if (request == null) { throw new FederationPolicyException("The ReservationSubmissionRequest cannot be null."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java index af5810665913c..6b45440985146 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.List; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; @@ -46,10 +47,11 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy { * application. * * @throws YarnException if the policy cannot determine a viable subcluster. + * @throws IOException if no active subclusters. */ SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext, - List blackListSubClusters) throws YarnException; + List blackListSubClusters) throws YarnException, IOException; /** * Determines the sub-cluster where a ReservationSubmissionRequest should be @@ -59,7 +61,8 @@ SubClusterId getHomeSubcluster( * @return a mapping of sub-clusters and the requests * * @throws YarnException if the policy fails to choose a sub-cluster + * @throws IOException if no active subclusters. */ SubClusterId getReservationHomeSubcluster( - ReservationSubmissionRequest request) throws YarnException; + ReservationSubmissionRequest request) throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index a86a43a213de0..979f92489e7ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,12 +17,13 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.Map; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -62,7 +63,8 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) @Override protected SubClusterId chooseSubCluster( - String queue, Map preSelectSubclusters) throws YarnException { + String queue, Map preSelectSubclusters) + throws YarnException, IOException { Map weights = getPolicyInfo().getRouterPolicyWeights(); SubClusterIdInfo chosen = null; long currBestMem = -1; @@ -77,7 +79,7 @@ protected SubClusterId chooseSubCluster( } } if (chosen == null) { - throw new FederationPolicyException( + throw new RetriableException( "Zero Active Subcluster with weight 1."); } return chosen.toId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java index 3abcf6fa378e4..56318ca72d2bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LocalityRouterPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -90,7 +91,7 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) @Override public SubClusterId getHomeSubcluster( ApplicationSubmissionContext appSubmissionContext, - List blackListSubClusters) throws YarnException { + List blackListSubClusters) throws YarnException, IOException { // null checks and default-queue behavior validate(appSubmissionContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index 7d50d3814a0dd..d259a68390052 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,10 +17,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.Map; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { @Override protected SubClusterId chooseSubCluster( - String queue, Map preSelectSubclusters) throws YarnException { + String queue, Map preSelectSubclusters) + throws YarnException, IOException { // This finds the sub-cluster with the highest weight among the // currently active ones. Map weights = getPolicyInfo().getRouterPolicyWeights(); @@ -48,7 +50,7 @@ protected SubClusterId chooseSubCluster( } } if (chosen == null) { - throw new FederationPolicyException( + throw new RetriableException( "No Active Subcluster with weight vector greater than zero."); } return chosen; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index 353329613ab97..6eaf42942b120 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -17,15 +17,16 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -59,9 +60,10 @@ public void reinitialize(FederationPolicyInitializationContext policyContext) @Override protected SubClusterId chooseSubCluster( - String queue, Map preSelectSubclusters) throws YarnException { + String queue, Map preSelectSubclusters) + throws YarnException, IOException { if (preSelectSubclusters == null || preSelectSubclusters.isEmpty()) { - throw new FederationPolicyException("No available subcluster to choose from."); + throw new RetriableException("No available subcluster to choose from."); } List list = new ArrayList<>(preSelectSubclusters.keySet()); return list.get(rand.nextInt(list.size())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index f2acf663603f6..3515d98f81006 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -18,12 +18,13 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.ArrayList; import java.util.Map; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -35,7 +36,8 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { @Override protected SubClusterId chooseSubCluster( - String queue, Map preSelectSubclusters) throws YarnException { + String queue, Map preSelectSubclusters) + throws YarnException, IOException { // note: we cannot pre-compute the weights, as the set of activeSubCluster // changes dynamically (and this would unfairly spread the load to @@ -55,7 +57,7 @@ protected SubClusterId chooseSubCluster( int pickedIndex = FederationPolicyUtils.getWeightedRandom(weightList); if (pickedIndex == -1) { - throw new FederationPolicyException("No positive weight found on active subclusters"); + throw new RetriableException("No positive weight found on active subclusters"); } return scIdList.get(pickedIndex); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java index a32ae38ff2623..bc64697b35f0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java @@ -36,6 +36,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -47,7 +48,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.federation.cache.FederationCache; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException; @@ -800,16 +800,16 @@ public int getActiveSubClustersCount() throws YarnException { * @param activeSubClusters List of active subClusters. * @param blackList blacklist. * @return Active SubClusterId. - * @throws YarnException When there is no Active SubCluster, + * @throws IOException When there is no Active SubCluster, * an exception will be thrown (No active SubCluster available to submit the request.) */ public static SubClusterId getRandomActiveSubCluster( Map activeSubClusters, List blackList) - throws YarnException { + throws IOException { // Check if activeSubClusters is empty, if it is empty, we need to throw an exception if (MapUtils.isEmpty(activeSubClusters)) { - throw new FederationPolicyException( + throw new RetriableException( FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } @@ -823,7 +823,7 @@ public static SubClusterId getRandomActiveSubCluster( // Check there are still active subcluster after removing the blacklist if (CollectionUtils.isEmpty(subClusterIds)) { - throw new FederationPolicyException( + throw new RetriableException( FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java index 8d6fc50c6c982..d2277d5fa2e83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.federation.policies; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; import static org.mockito.Mockito.mock; @@ -30,12 +32,14 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.RejectAMRMProxyPolicy; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; @@ -125,23 +129,35 @@ public void testReinitilializeBad3() throws YarnException { @Test public void testNoSubclusters() throws YarnException { - assertThrows(FederationPolicyException.class, () -> { - // empty the activeSubclusters map - FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), - getPolicyInfo(), new HashMap<>()); + // empty the activeSubclusters map + FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), + getPolicyInfo(), new HashMap<>()); - ConfigurableFederationPolicy localPolicy = getPolicy(); - if (localPolicy instanceof FederationRouterPolicy) { + ConfigurableFederationPolicy localPolicy = getPolicy(); + if (localPolicy instanceof FederationRouterPolicy) { + try { ((FederationRouterPolicy) localPolicy) - .getHomeSubcluster(getApplicationSubmissionContext(), null); - } else { + .getHomeSubcluster(getApplicationSubmissionContext(), null); + fail(); + } catch (Exception e) { + assertTrue(e instanceof RetriableException); + } + } else { + try { String[] hosts = new String[] {"host1", "host2"}; List resourceRequests = FederationPoliciesTestUtil .createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false); ((FederationAMRMProxyPolicy) localPolicy).splitResourceRequests( resourceRequests, new HashSet()); + fail(); + } catch (Exception e) { + if (localPolicy instanceof RejectAMRMProxyPolicy) { + assertTrue(e instanceof FederationPolicyException); + } else { + assertTrue(e instanceof RetriableException); + } } - }); + } } public ConfigurableFederationPolicy getPolicy() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java index 020dc0b857f2e..9859940f73f6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestRouterPolicyFacade.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -85,7 +86,7 @@ public void setup() throws YarnException { } @Test - public void testConfigurationUpdate() throws YarnException { + public void testConfigurationUpdate() throws YarnException, IOException { // in this test we see what happens when the configuration is changed // between calls. We achieve this by changing what is in the store. @@ -115,7 +116,7 @@ public void testConfigurationUpdate() throws YarnException { } @Test - public void testGetHomeSubcluster() throws YarnException { + public void testGetHomeSubcluster() throws YarnException, IOException { ApplicationSubmissionContext applicationSubmissionContext = mock(ApplicationSubmissionContext.class); @@ -146,7 +147,7 @@ public void testGetHomeSubcluster() throws YarnException { } @Test - public void testFallbacks() throws YarnException { + public void testFallbacks() throws YarnException, IOException { // this tests the behavior of the system when the queue requested is // not configured (or null) and there is no default policy configured diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java index 31646ab6e9ed2..b1f754b3699ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestHomeAMRMProxyPolicy.java @@ -25,16 +25,17 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -72,7 +73,7 @@ public void setUp() throws Exception { } @Test - public void testSplitAllocateRequest() throws YarnException { + public void testSplitAllocateRequest() throws YarnException, IOException { // Verify the request only goes to the home subcluster String[] hosts = new String[] {"host0", "host1", "host2", "host3"}; @@ -89,7 +90,7 @@ public void testSplitAllocateRequest() throws YarnException { } @Test - public void testHomeSubclusterNotActive() throws YarnException { + public void testHomeSubclusterNotActive() throws YarnException, IOException { // We setup the home subcluster to a non-existing one initializePolicyContext(getPolicy(), mock(WeightedPolicyInfo.class), @@ -104,7 +105,7 @@ public void testHomeSubclusterNotActive() throws YarnException { federationPolicy.splitResourceRequests(resourceRequests, new HashSet()); fail("It should fail when the home subcluster is not active"); - } catch(FederationPolicyException e) { + } catch(RetriableException e) { GenericTestUtils.assertExceptionContains("is not active", e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java index 3efa6086e683a..5b0e89a016bfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/TestLocalityMulticastAMRMProxyPolicy.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -683,7 +684,7 @@ public void testIntegerAssignment() throws YarnException { } @Test - public void testCancelWithLocalizedResource() throws YarnException { + public void testCancelWithLocalizedResource() throws YarnException, IOException { // Configure policy to be 100% headroom based getPolicyInfo().setHeadroomAlpha(1.0f); @@ -814,7 +815,7 @@ protected SubClusterId getSubClusterForUnResolvedRequest( Map activeClusters = null; try { activeClusters = getActiveSubclusters(); - } catch (YarnException e) { + } catch (Exception e) { throw new RuntimeException(e); } // The randomly selected sub-cluster should at least be active diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java index b1cbf2ce95efb..a9d3ede707f13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -59,7 +60,7 @@ public abstract class BaseRouterPoliciesTest extends BaseFederationPoliciesTest { @Test - public void testNullQueueRouting() throws YarnException { + public void testNullQueueRouting() throws YarnException, IOException { FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy(); ApplicationSubmissionContext applicationSubmissionContext = ApplicationSubmissionContext.newInstance(null, null, null, null, null, @@ -77,7 +78,7 @@ public void testNullAppContext() throws YarnException { } @Test - public void testBlacklistSubcluster() throws YarnException { + public void testBlacklistSubcluster() throws YarnException, IOException { FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy(); ApplicationSubmissionContext applicationSubmissionContext = ApplicationSubmissionContext.newInstance(null, null, null, null, null, @@ -127,7 +128,7 @@ public void testAllBlacklistSubcluster() throws YarnException { localPolicy.getHomeSubcluster(applicationSubmissionContext, blacklistSubclusters); fail(); - } catch (YarnException e) { + } catch (Exception e) { assertTrue(e.getMessage() .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); } @@ -167,7 +168,7 @@ public void testUnknownReservation() throws Exception { } @Test - public void testFollowReservation() throws YarnException { + public void testFollowReservation() throws YarnException, IOException { long now = Time.now(); ReservationSubmissionRequest resReq = getReservationSubmissionRequest(); @@ -201,7 +202,7 @@ public void testFollowReservation() throws YarnException { } @Test - public void testUpdateReservation() throws YarnException { + public void testUpdateReservation() throws YarnException, IOException { long now = Time.now(); ReservationSubmissionRequest resReq = getReservationSubmissionRequest(); when(resReq.getQueue()).thenReturn("queue1"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java index 2badd2dffbbe5..17ca342c61219 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestHashBasedRouterPolicy.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -54,7 +55,7 @@ public void setUp() throws Exception { } @Test - public void testHashSpreadUniformlyAmongSubclusters() throws YarnException { + public void testHashSpreadUniformlyAmongSubclusters() throws YarnException, IOException { SubClusterId chosen; Map counter = new HashMap<>(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java index 2fe73669024e8..07ee2f2bc5b95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -109,7 +110,7 @@ public String generateClusterMetricsInfo(int id) { } @Test - public void testLoadIsRespected() throws YarnException { + public void testLoadIsRespected() throws YarnException, IOException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) .getHomeSubcluster(getApplicationSubmissionContext(), null); @@ -141,13 +142,13 @@ public void testIfNoSubclustersWithWeightOne() throws Exception { FederationPoliciesTestUtil.initializePolicyContext(policy, getPolicyInfo(), getActiveSubclusters()); - LambdaTestUtils.intercept(YarnException.class, "Zero Active Subcluster with weight 1.", + LambdaTestUtils.intercept(IOException.class, "Zero Active Subcluster with weight 1.", () -> ((FederationRouterPolicy) policy). getHomeSubcluster(getApplicationSubmissionContext(), null)); } @Test - public void testUpdateReservation() throws YarnException { + public void testUpdateReservation() throws YarnException, IOException { long now = Time.now(); ReservationSubmissionRequest resReq = getReservationSubmissionRequest(); when(resReq.getQueue()).thenReturn("queue1"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java index 93230e568591d..8c45518ceae01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLocalityRouterPolicy.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -99,7 +100,7 @@ private void initializePolicy(Configuration conf) throws YarnException { * the node belongs to an active subcluster. */ @Test - public void testNodeInActiveSubCluster() throws YarnException { + public void testNodeInActiveSubCluster() throws YarnException, IOException { List requests = new ArrayList(); requests.add(ResourceRequest .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1), @@ -135,7 +136,7 @@ public void testNodeInActiveSubCluster() throws YarnException { * TestWeightedRandomRouterPolicy. */ @Test - public void testMultipleResourceRequests() throws YarnException { + public void testMultipleResourceRequests() throws YarnException, IOException { List requests = new ArrayList(); requests.add(ResourceRequest .newInstance(Priority.UNDEFINED, "node1", Resource.newInstance(10, 1), @@ -161,7 +162,7 @@ public void testMultipleResourceRequests() throws YarnException { * the node does not exist in the Resolver MachineList file. */ @Test - public void testNodeNotExists() throws YarnException { + public void testNodeNotExists() throws YarnException, IOException { List requests = new ArrayList(); boolean relaxLocality = true; requests.add(ResourceRequest @@ -190,7 +191,7 @@ public void testNodeNotExists() throws YarnException { * the node is in a blacklist subclusters. */ @Test - public void testNodeInABlacklistSubCluster() throws YarnException { + public void testNodeInABlacklistSubCluster() throws YarnException, IOException { // Blacklist SubCluster3 String subClusterToBlacklist = "subcluster3"; // Remember the current value of subcluster3 @@ -239,7 +240,7 @@ public void testNodeInABlacklistSubCluster() throws YarnException { * the node is not in the policy weights. */ @Test - public void testNodeNotInPolicy() throws YarnException { + public void testNodeNotInPolicy() throws YarnException, IOException { // Blacklist SubCluster3 String subClusterToBlacklist = "subcluster3"; // Remember the current value of subcluster3 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java index 2304826a3e3e0..159e6002e41aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java @@ -18,13 +18,14 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import java.io.IOException; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; -import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -78,7 +79,7 @@ public void setUp() throws Exception { } @Test - public void testPickLowestWeight() throws YarnException { + public void testPickLowestWeight() throws YarnException, IOException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) .getHomeSubcluster(getApplicationSubmissionContext(), null); assertEquals("sc5", chosen.getId()); @@ -104,7 +105,7 @@ public void testZeroSubClustersWithPositiveWeight() throws Exception { FederationPoliciesTestUtil.initializePolicyContext(getPolicy(), getPolicyInfo(), getActiveSubclusters()); - intercept(FederationPolicyException.class, + intercept(RetriableException.class, "No Active Subcluster with weight vector greater than zero.", () -> ((FederationRouterPolicy) getPolicy()) .getHomeSubcluster(getApplicationSubmissionContext(), null)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java index bc9b2e1e0690b..4ac1ad1144257 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java @@ -29,6 +29,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; + import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -55,7 +57,7 @@ public void setUp() throws Exception { } @Test - public void testOneSubclusterIsChosen() throws YarnException { + public void testOneSubclusterIsChosen() throws YarnException, IOException { SubClusterId chosen = ((FederationRouterPolicy) getPolicy()) .getHomeSubcluster(getApplicationSubmissionContext(), null); assertTrue(getActiveSubclusters().keySet().contains(chosen)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java index d3cf6612e7888..15cf58d488cd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -90,7 +91,7 @@ public void configureWeights(float numSubClusters) { } @Test - public void testClusterChosenWithRightProbability() throws YarnException { + public void testClusterChosenWithRightProbability() throws YarnException, IOException { ApplicationSubmissionContext context = mock(ApplicationSubmissionContext.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 86d78f2fc351a..5c10988bff72d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -762,12 +762,12 @@ public AllocateResponse allocate(AllocateRequest request) } } - try { - // Split the heart beat request into multiple requests, one for each - // sub-cluster RM that is used by this application. - Map requests = - splitAllocateRequest(request); + // Split the heart beat request into multiple requests, one for each + // sub-cluster RM that is used by this application. + Map requests = + splitAllocateRequest(request); + try { /** * Send the requests to the all sub-cluster resource managers. All * requests are synchronously triggered but sent asynchronously. Later the @@ -1143,7 +1143,7 @@ private SubClusterId getSubClusterForNode(String nodeName) { * sub-cluster RM. */ private Map splitAllocateRequest( - AllocateRequest request) throws YarnException { + AllocateRequest request) throws YarnException, IOException { Map requestMap = new HashMap<>(); // Create heart beat request for home sub-cluster resource manager @@ -1789,9 +1789,10 @@ private boolean warnIfNotExists(ContainerId containerId, String actionName) { * @param askList the ask list to split * @return the split asks * @throws YarnException if split fails + * @throws IOException if no active subclusters. */ protected Map> splitResourceRequests( - List askList) throws YarnException { + List askList) throws YarnException, IOException { return policyInterpreter.splitResourceRequests(askList, getTimedOutSCs(true)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 81cfb2e743b7a..de4ad3f6b80dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -1797,6 +1797,7 @@ void setDB(DB testDb) { * 4) Within a major upgrade, say 1.2 to 2.0: * throw exception and indicate user to use a separate upgrade tool to * upgrade NM state or remove incompatible old state. + * @throws IOException if NM state version is incompatible. */ protected void checkVersion() throws IOException { Version loadedVersion = loadVersion(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 6884381e99a5c..17de738e1eb19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.lang.reflect.Method; @@ -306,10 +307,10 @@ protected ApplicationClientProtocol getClientRMProxyForSubCluster( } private SubClusterId getRandomActiveSubCluster( - Map activeSubClusters) throws YarnException { + Map activeSubClusters) throws IOException { if (activeSubClusters == null || activeSubClusters.isEmpty()) { - RouterServerUtil.logAndThrowException( - FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); + throw new RetriableException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); } List list = new ArrayList<>(activeSubClusters.keySet()); return list.get(rand.nextInt(list.size())); @@ -364,6 +365,12 @@ public GetNewApplicationResponse getNewApplication( routerMetrics.succeededAppsCreated(stopTime - startTime); return response; } + } catch (RetriableException e) { + routerMetrics.incrAppsFailedCreated(); + RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, e.getMessage()); + LOG.error(e.getMessage()); + throw e; } catch (Exception e) { routerMetrics.incrAppsFailedCreated(); RouterAuditLogger.logFailure(user.getShortUserName(), GET_NEW_APP, UNKNOWN, @@ -523,6 +530,12 @@ public SubmitApplicationResponse submitApplication( return response; } + } catch (RetriableException e) { + routerMetrics.incrAppsFailedSubmitted(); + RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, + TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId); + LOG.error(e.getMessage()); + throw e; } catch (Exception e) { routerMetrics.incrAppsFailedSubmitted(); RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index f28fc2691caa1..95ba63bade593 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -405,8 +405,8 @@ public Response createNewApplication(HttpServletRequest hsr) routerMetrics.succeededAppsCreated(stopTime - startTime); return response; } - } catch (FederationPolicyException e) { - // If a FederationPolicyException is thrown, the service is unavailable. + } catch (IOException e) { + // If a IOException is thrown, the service is unavailable. routerMetrics.incrAppsFailedCreated(); RouterAuditLogger.logFailure(getUser().getShortUserName(), GET_NEW_APP, UNKNOWN, TARGET_WEB_SERVICE, e.getLocalizedMessage()); @@ -2466,7 +2466,7 @@ public Response createNewReservation(HttpServletRequest hsr) private Response invokeCreateNewReservation(Map subClustersActive, List blackList, HttpServletRequest hsr, int retryCount) - throws YarnException { + throws YarnException, IOException { SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive, blackList); LOG.info("createNewReservation try #{} on SubCluster {}.", retryCount, subClusterId); SubClusterInfo subClusterInfo = subClustersActive.get(subClusterId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 6ec253a8f7725..6869c5cc6ca80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -193,7 +193,7 @@ public void testGetNewApplicationOneBadSC(String policyManagerName) throws Excep setupCluster(Arrays.asList(bad2)); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); - LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(IOException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.getNewApplication(request)); } @@ -209,7 +209,7 @@ public void testGetNewApplicationTwoBadSCs(String policyManagerName) throws Exce setupCluster(Arrays.asList(bad1, bad2)); GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); - LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(IOException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.getNewApplication(request)); } @@ -247,7 +247,7 @@ public void testSubmitApplicationOneBadSC(String policyManagerName) throws Excep ApplicationId.newInstance(System.currentTimeMillis(), 1); final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); - LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(IOException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.submitApplication(request)); } @@ -277,7 +277,7 @@ public void testSubmitApplicationTwoBadSCs(String policyManagerName) throws Exce ApplicationId.newInstance(System.currentTimeMillis(), 1); final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); - LambdaTestUtils.intercept(YarnException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, + LambdaTestUtils.intercept(IOException.class, NO_ACTIVE_SUBCLUSTER_AVAILABLE, () -> interceptor.submitApplication(request)); }