Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ public static void cancelShuffle(int shuffleId, String reason) {
.hiddenImpl(TaskSetManager.class, "taskInfos")
.defaultAlwaysNull()
.build();
private static final DynFields.UnboundField<int[]> TASK_FAILURES =
DynFields.builder()
.hiddenImpl(TaskSetManager.class, "numFailures")
.defaultAlwaysNull()
.build();

/**
* TaskSetManager - it is not designed to be used outside the spark scheduler. Please be careful.
Expand Down Expand Up @@ -284,6 +289,39 @@ protected static Tuple2<TaskInfo, List<TaskInfo>> getTaskAttempts(
}
}

/**
* Gets the number of task attempts that have already failed for the given task index. Note: This
* count does NOT include the current failure. To get the total failure count including the
* current attempt, you need to add 1 to the returned value.
*
* @param taskSetManager the TaskSetManager to query
* @param index the task index
* @return the number of previous failed attempts, or -1 if an error occurs
*/
@VisibleForTesting
protected static int getTaskFailureCount(TaskSetManager taskSetManager, int index) {
if (taskSetManager == null) {
logger.error("TaskSetManager is null for task index: {}", index);
return -1;
}

int[] numFailures = TASK_FAILURES.bind(taskSetManager).get();
if (numFailures == null) {
logger.error("Failed to get numFailures array from TaskSetManager for task index: {}", index);
return -1;
}

if (index < 0 || index >= numFailures.length) {
logger.error(
"Task index {} is out of bounds for numFailures array (length: {})",
index,
numFailures.length);
return -1;
}

return numFailures[index];
}

protected static Map<String, Set<Long>> reportedStageShuffleFetchFailureTaskIds =
JavaUtils.newConcurrentHashMap();

Expand Down Expand Up @@ -335,7 +373,6 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
if (taskAttempts == null) return true;

TaskInfo taskInfo = taskAttempts._1();
int failedTaskAttempts = 1;
boolean hasRunningAttempt = false;
for (TaskInfo ti : taskAttempts._2()) {
if (ti.taskId() != taskId) {
Expand All @@ -347,7 +384,6 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
taskId,
taskInfo.attemptNumber(),
ti.attemptNumber());
failedTaskAttempts += 1;
} else if (ti.successful()) {
logger.info(
"StageId={} index={} taskId={} attempt={} another attempt {} is successful.",
Expand All @@ -366,36 +402,31 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
taskInfo.attemptNumber(),
ti.attemptNumber());
hasRunningAttempt = true;
} else if ("FAILED".equals(ti.status()) || "UNKNOWN".equals(ti.status())) {
// For KILLED state task, Spark does not count the number of failures
// For UNKNOWN state task, Spark does count the number of failures
// For FAILED state task, Spark decides whether to count the failure based on the
// different failure reasons. Since we cannot obtain the failure
// reason here, we will count all tasks in FAILED state.
logger.info(
"StageId={} index={} taskId={} attempt={} another attempt {} status={}.",
stageId,
taskInfo.index(),
taskId,
taskInfo.attemptNumber(),
ti.attemptNumber(),
ti.status());
failedTaskAttempts += 1;
}
}
}
// The following situations should trigger a FetchFailed exception:
// 1. If failedTaskAttempts >= maxTaskFails
// 2. If no other taskAttempts are running
if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
// 1. If total failures (previous failures + current failure) >= maxTaskFails
// 2. If no other taskAttempts are running, trigger a FetchFailed exception
// to keep the same behavior as Spark.
// Note: previousFailureCount does NOT include the current failure,
// so we compare with (maxTaskFails - 1) which is equivalent to
// (previousFailureCount + 1) >= maxTaskFails
int previousFailureCount = getTaskFailureCount(taskSetManager, taskInfo.index());
if (previousFailureCount < 0) {
return true;
}
if (previousFailureCount + 1 >= maxTaskFails || !hasRunningAttempt) {
logger.warn(
"StageId={}, index={}, taskId={}, attemptNumber={}: Task failure count {} reached "
+ "maximum allowed failures {} or no running attempt exists.",
"StageId={}, index={}, taskId={}, attemptNumber={}: Previous failure count {} "
+ "(total with current: {}) reached maximum allowed failures {} "
+ "or no running attempt exists.",
stageId,
taskInfo.index(),
taskId,
taskInfo.attemptNumber(),
failedTaskAttempts,
previousFailureCount,
previousFailureCount + 1,
maxTaskFails);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ public static void cancelShuffle(int shuffleId, String reason) {
.hiddenImpl(TaskSetManager.class, "taskInfos")
.defaultAlwaysNull()
.build();
private static final DynFields.UnboundField<int[]> TASK_FAILURES =
DynFields.builder()
.hiddenImpl(TaskSetManager.class, "numFailures")
.defaultAlwaysNull()
.build();

/**
* TaskSetManager - it is not designed to be used outside the spark scheduler. Please be careful.
Expand Down Expand Up @@ -399,6 +404,39 @@ protected static Tuple2<TaskInfo, List<TaskInfo>> getTaskAttempts(
}
}

/**
* Gets the number of task attempts that have already failed for the given task index. Note: This
* count does NOT include the current failure. To get the total failure count including the
* current attempt, you need to add 1 to the returned value.
*
* @param taskSetManager the TaskSetManager to query
* @param index the task index
* @return the number of previous failed attempts, or -1 if an error occurs
*/
@VisibleForTesting
protected static int getTaskFailureCount(TaskSetManager taskSetManager, int index) {
if (taskSetManager == null) {
LOG.error("TaskSetManager is null for task index: {}", index);
return -1;
}

int[] numFailures = TASK_FAILURES.bind(taskSetManager).get();
if (numFailures == null) {
LOG.error("Failed to get numFailures array from TaskSetManager for task index: {}", index);
return -1;
}

if (index < 0 || index >= numFailures.length) {
LOG.error(
"Task index {} is out of bounds for numFailures array (length: {})",
index,
numFailures.length);
return -1;
}

return numFailures[index];
}

protected static Map<String, Set<Long>> reportedStageShuffleFetchFailureTaskIds =
JavaUtils.newConcurrentHashMap();

Expand Down Expand Up @@ -450,7 +488,6 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
if (taskAttempts == null) return true;

TaskInfo taskInfo = taskAttempts._1();
int failedTaskAttempts = 1;
boolean hasRunningAttempt = false;
for (TaskInfo ti : taskAttempts._2()) {
if (ti.taskId() != taskId) {
Expand All @@ -462,7 +499,6 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
taskId,
taskInfo.attemptNumber(),
ti.attemptNumber());
failedTaskAttempts += 1;
} else if (ti.successful()) {
LOG.info(
"StageId={} index={} taskId={} attempt={} another attempt {} is successful.",
Expand All @@ -481,36 +517,31 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
taskInfo.attemptNumber(),
ti.attemptNumber());
hasRunningAttempt = true;
} else if ("FAILED".equals(ti.status()) || "UNKNOWN".equals(ti.status())) {
// For KILLED state task, Spark does not count the number of failures
// For UNKNOWN state task, Spark does count the number of failures
// For FAILED state task, Spark decides whether to count the failure based on the
// different failure reasons. Since we cannot obtain the failure
// reason here, we will count all tasks in FAILED state.
LOG.info(
"StageId={} index={} taskId={} attempt={} another attempt {} status={}.",
stageId,
taskInfo.index(),
taskId,
taskInfo.attemptNumber(),
ti.attemptNumber(),
ti.status());
failedTaskAttempts += 1;
}
}
}
// The following situations should trigger a FetchFailed exception:
// 1. If failedTaskAttempts >= maxTaskFails
// 2. If no other taskAttempts are running
if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
// 1. If total failures (previous failures + current failure) >= maxTaskFails
// 2. If no other taskAttempts are running, trigger a FetchFailed exception
// to keep the same behavior as Spark.
// Note: previousFailureCount does NOT include the current failure,
// so we compare with (maxTaskFails - 1) which is equivalent to
// (previousFailureCount + 1) >= maxTaskFails
int previousFailureCount = getTaskFailureCount(taskSetManager, taskInfo.index());
if (previousFailureCount < 0) {
return true;
}
if (previousFailureCount + 1 >= maxTaskFails || !hasRunningAttempt) {
LOG.warn(
"StageId={}, index={}, taskId={}, attemptNumber={}: Task failure count {} reached "
+ "maximum allowed failures {} or no running attempt exists.",
"StageId={}, index={}, taskId={}, attemptNumber={}: Previous failure count {} "
+ "(total with current: {}) reached maximum allowed failures {} "
+ "or no running attempt exists.",
stageId,
taskInfo.index(),
taskId,
taskInfo.attemptNumber(),
failedTaskAttempts,
previousFailureCount,
previousFailureCount + 1,
maxTaskFails);
return true;
} else {
Expand Down
Loading