Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ protected static RetryPolicy createRetryPolicy(Configuration conf,

return RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts,
failoverSleepBaseMs, failoverSleepMaxMs);
maxFailoverAttempts, failoverSleepBaseMs, failoverSleepMaxMs);
}

if (rmConnectionRetryIntervalMS < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.hadoop.yarn.server.federation.policies;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;

Expand Down Expand Up @@ -138,15 +139,16 @@ public void setPolicyContext(
* @return the map of ids to info for all active subclusters.
*
* @throws YarnException if we can't get the list.
* @throws IOException if no active subclusters.
*/
protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
throws YarnException {
throws YarnException, IOException {

Map<SubClusterId, SubClusterInfo> activeSubclusters =
getPolicyContext().getFederationStateStoreFacade().getSubClusters(true);

if (activeSubclusters == null || activeSubclusters.size() < 1) {
throw new NoActiveSubclustersException(
throw new RetriableException(
"Zero active subclusters, cannot pick where to send job.");
}
return activeSubclusters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.yarn.server.federation.policies;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -25,10 +26,10 @@

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
Expand Down Expand Up @@ -185,12 +186,12 @@ public static FederationAMRMProxyPolicy loadAMRMPolicy(String queue,
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
* @throws FederationPolicyException if there are no usable subclusters.
* @throws IOException if there are no usable subclusters.
*/
public static void validateSubClusterAvailability(
Collection<SubClusterId> activeSubClusters,
Collection<SubClusterId> blackListSubClusters)
throws FederationPolicyException {
throws IOException {
if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
if (blackListSubClusters == null) {
return;
Expand All @@ -202,7 +203,7 @@ public static void validateSubClusterAvailability(
}
}
}
throw new FederationPolicyException(
throw new RetriableException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.yarn.server.federation.policies;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
Expand Down Expand Up @@ -124,10 +125,11 @@ public RouterPolicyFacade(Configuration conf,
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this app.
* @throws IOException if no active subclusters.
*/
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
List<SubClusterId> blackListSubClusters) throws YarnException, IOException {

// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
Expand Down Expand Up @@ -231,9 +233,10 @@ public synchronized void reset() {
*
* @throws YarnException if there are issues initializing policies, or no
* valid sub-cluster id could be found for this reservation.
* @throws IOException if no active subclusters.
*/
public SubClusterId getReservationHomeSubCluster(
ReservationSubmissionRequest request) throws YarnException {
ReservationSubmissionRequest request) throws YarnException, IOException {

// the maps are concurrent, but we need to protect from reset()
// reinitialization mid-execution by creating a new reference local to this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void reinitialize(
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException {

Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -46,10 +47,11 @@ public interface FederationAMRMProxyPolicy
* list of {@link ResourceRequest}s that should be forwarded to it
* @throws YarnException in case the request is malformed or no viable
* sub-clusters can be found.
* @throws IOException if no active subclusters.
*/
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException;
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException;

/**
* This method should be invoked to notify the policy about responses being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
Expand Down Expand Up @@ -57,14 +59,14 @@ public void reinitialize(
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException {
if (homeSubcluster == null) {
throw new FederationPolicyException("No home subcluster available");
}

Map<SubClusterId, SubClusterInfo> active = getActiveSubclusters();
if (!active.containsKey(homeSubcluster)) {
throw new FederationPolicyException(
throw new RetriableException(
"The local subcluster " + homeSubcluster + " is not active");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -34,6 +35,7 @@

import org.apache.commons.collections4.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
Expand All @@ -46,7 +48,6 @@
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
Expand Down Expand Up @@ -267,7 +268,7 @@ public void notifyOfResponse(SubClusterId subClusterId,
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
Set<SubClusterId> timedOutSubClusters) throws YarnException, IOException {

// object used to accumulate statistics about the answer, initialize with
// active subclusters. Create a new instance per call because this method
Expand Down Expand Up @@ -712,7 +713,8 @@ protected final class AllocationBookkeeper {

private void reinitialize(
Map<SubClusterId, SubClusterInfo> activeSubclusters,
Set<SubClusterId> timedOutSubClusters, Configuration pConf) throws YarnException {
Set<SubClusterId> timedOutSubClusters, Configuration pConf)
throws YarnException, IOException {

if (MapUtils.isEmpty(activeSubclusters)) {
throw new YarnRuntimeException("null activeSubclusters received");
Expand Down Expand Up @@ -752,7 +754,7 @@ private void reinitialize(
String errorMsg = "None of the subClusters enabled in this Policy (weight > 0) are "
+ "currently active we cannot forward the ResourceRequest(s)";
if (failOnError) {
throw new NoActiveSubclustersException(errorMsg);
throw new RetriableException(errorMsg);
} else {
LOG.error(errorMsg + ", continuing by enabling all active subClusters.");
activeAndEnabledSC.addAll(activeSubclusters.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.federation.policies.router;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Collections;
Expand Down Expand Up @@ -82,9 +83,11 @@ public void validate(ApplicationSubmissionContext appSubmissionContext)
* @return the chosen sub-cluster
*
* @throws YarnException if the policy fails to choose a sub-cluster
* @throws IOException if no active subclusters.
*/
protected abstract SubClusterId chooseSubCluster(String queue,
Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException;
Map<SubClusterId, SubClusterInfo> preSelectSubClusters)
throws YarnException, IOException;

/**
* Filter chosen SubCluster based on reservationId.
Expand Down Expand Up @@ -130,11 +133,11 @@ protected Map<SubClusterId, SubClusterInfo> prefilterSubClusters(
* @return a hash-based chosen {@link SubClusterId} that will be the "home"
* for this application.
*
* @throws YarnException if there are no active subclusters.
* @throws IOException if there are no active subclusters.
*/
@Override
public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,
List<SubClusterId> blackLists) throws YarnException {
List<SubClusterId> blackLists) throws YarnException, IOException {

// null checks and default-queue behavior
validate(appContext);
Expand Down Expand Up @@ -169,7 +172,7 @@ public SubClusterId getHomeSubcluster(ApplicationSubmissionContext appContext,
*/
@Override
public SubClusterId getReservationHomeSubcluster(ReservationSubmissionRequest request)
throws YarnException {
throws YarnException, IOException {
if (request == null) {
throw new FederationPolicyException("The ReservationSubmissionRequest cannot be null.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.yarn.server.federation.policies.router;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
Expand Down Expand Up @@ -46,10 +47,11 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
* application.
*
* @throws YarnException if the policy cannot determine a viable subcluster.
* @throws IOException if no active subclusters.
*/
SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException;
List<SubClusterId> blackListSubClusters) throws YarnException, IOException;

/**
* Determines the sub-cluster where a ReservationSubmissionRequest should be
Expand All @@ -59,7 +61,8 @@ SubClusterId getHomeSubcluster(
* @return a mapping of sub-clusters and the requests
*
* @throws YarnException if the policy fails to choose a sub-cluster
* @throws IOException if no active subclusters.
*/
SubClusterId getReservationHomeSubcluster(
ReservationSubmissionRequest request) throws YarnException;
ReservationSubmissionRequest request) throws YarnException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.hadoop.yarn.server.federation.policies.router;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
Expand Down Expand Up @@ -62,7 +63,8 @@ public void reinitialize(FederationPolicyInitializationContext policyContext)

@Override
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters)
throws YarnException, IOException {
Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
SubClusterIdInfo chosen = null;
long currBestMem = -1;
Expand All @@ -77,7 +79,7 @@ protected SubClusterId chooseSubCluster(
}
}
if (chosen == null) {
throw new FederationPolicyException(
throw new RetriableException(
"Zero Active Subcluster with weight 1.");
}
return chosen.toId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.yarn.server.federation.policies.router;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -90,7 +91,7 @@ public void reinitialize(FederationPolicyInitializationContext policyContext)
@Override
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext,
List<SubClusterId> blackListSubClusters) throws YarnException {
List<SubClusterId> blackListSubClusters) throws YarnException, IOException {

// null checks and default-queue behavior
validate(appSubmissionContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.hadoop.yarn.server.federation.policies.router;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
Expand All @@ -34,7 +35,8 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {

@Override
protected SubClusterId chooseSubCluster(
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters) throws YarnException {
String queue, Map<SubClusterId, SubClusterInfo> preSelectSubclusters)
throws YarnException, IOException {
// This finds the sub-cluster with the highest weight among the
// currently active ones.
Map<SubClusterIdInfo, Float> weights = getPolicyInfo().getRouterPolicyWeights();
Expand All @@ -48,7 +50,7 @@ protected SubClusterId chooseSubCluster(
}
}
if (chosen == null) {
throw new FederationPolicyException(
throw new RetriableException(
"No Active Subcluster with weight vector greater than zero.");
}
return chosen;
Expand Down
Loading