Skip to content

Commit 5d6ce21

Browse files
leixmSteNicholas
authored andcommitted
[CELEBORN-1983][FOLLOWUP] Fix fetch fail not throw due to reach spark maxTaskFailures
### What changes were proposed in this pull request? Fix fetch fail not throw due to reach spark maxTaskFailures. ### Why are the changes needed? The condition `ti.attemptNumber() >= maxTaskFails - 1` may not be executed. Suppose that the current `taskAttempts` is index0, index1, index2, and index3, and that index0 and index1 have already failed while index2 and index3 are running, and the current `reportFetchFailed` is index3, then the final result will be false, while the expected result will be true. Therefore, we should check the attemptNumber of the current task separately before the loop starts. <img width="3558" height="608" alt="image" src="https://github.com/user-attachments/assets/2a0af3e7-912e-420e-a864-4c525d07e251" /> <img width="2332" height="814" alt="image" src="https://github.com/user-attachments/assets/bf832091-56d5-41b8-b58a-502e409d67a8" /> ### Does this PR resolve a correctness bug? No. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #3531 from leixm/follow_CELEBORN-1983. Authored-by: Xianming Lei <[email protected]> Signed-off-by: SteNicholas <[email protected]>
1 parent cc0d1ba commit 5d6ce21

File tree

2 files changed

+66
-20
lines changed
  • client-spark

2 files changed

+66
-20
lines changed

client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -335,16 +335,19 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
335335
if (taskAttempts == null) return true;
336336

337337
TaskInfo taskInfo = taskAttempts._1();
338+
int failedTaskAttempts = 1;
339+
boolean hasRunningAttempt = false;
338340
for (TaskInfo ti : taskAttempts._2()) {
339341
if (ti.taskId() != taskId) {
340342
if (reportedStageTaskIds.contains(ti.taskId())) {
341343
logger.info(
342-
"StageId={} index={} taskId={} attempt={} another attempt {} has reported shuffle fetch failure, ignore it.",
344+
"StageId={} index={} taskId={} attempt={} another attempt {} has reported shuffle fetch failure.",
343345
stageId,
344346
taskInfo.index(),
345347
taskId,
346348
taskInfo.attemptNumber(),
347349
ti.attemptNumber());
350+
failedTaskAttempts += 1;
348351
} else if (ti.successful()) {
349352
logger.info(
350353
"StageId={} index={} taskId={} attempt={} another attempt {} is successful.",
@@ -362,22 +365,42 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
362365
taskId,
363366
taskInfo.attemptNumber(),
364367
ti.attemptNumber());
365-
return false;
366-
}
367-
} else {
368-
if (ti.attemptNumber() >= maxTaskFails - 1) {
369-
logger.warn(
370-
"StageId={} index={} taskId={} attemptNumber {} reach maxTaskFails {}.",
368+
hasRunningAttempt = true;
369+
} else if ("FAILED".equals(ti.status()) || "UNKNOWN".equals(ti.status())) {
370+
// For KILLED state task, Spark does not count the number of failures
371+
// For UNKNOWN state task, Spark does count the number of failures
372+
// For FAILED state task, Spark decides whether to count the failure based on the
373+
// different failure reasons. Since we cannot obtain the failure
374+
// reason here, we will count all tasks in FAILED state.
375+
logger.info(
376+
"StageId={} index={} taskId={} attempt={} another attempt {} status={}.",
371377
stageId,
372378
taskInfo.index(),
373379
taskId,
380+
taskInfo.attemptNumber(),
374381
ti.attemptNumber(),
375-
maxTaskFails);
376-
return true;
382+
ti.status());
383+
failedTaskAttempts += 1;
377384
}
378385
}
379386
}
380-
return true;
387+
// The following situations should trigger a FetchFailed exception:
388+
// 1. If failedTaskAttempts >= maxTaskFails
389+
// 2. If no other taskAttempts are running
390+
if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
391+
logger.warn(
392+
"StageId={}, index={}, taskId={}, attemptNumber={}: Task failure count {} reached "
393+
+ "maximum allowed failures {} or no running attempt exists.",
394+
stageId,
395+
taskInfo.index(),
396+
taskId,
397+
taskInfo.attemptNumber(),
398+
failedTaskAttempts,
399+
maxTaskFails);
400+
return true;
401+
} else {
402+
return false;
403+
}
381404
} else {
382405
logger.error(
383406
"Can not get TaskSetManager for taskId: {}, ignore it. (This typically occurs when: "

client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -450,16 +450,19 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
450450
if (taskAttempts == null) return true;
451451

452452
TaskInfo taskInfo = taskAttempts._1();
453+
int failedTaskAttempts = 1;
454+
boolean hasRunningAttempt = false;
453455
for (TaskInfo ti : taskAttempts._2()) {
454456
if (ti.taskId() != taskId) {
455457
if (reportedStageTaskIds.contains(ti.taskId())) {
456458
LOG.info(
457-
"StageId={} index={} taskId={} attempt={} another attempt {} has reported shuffle fetch failure, ignore it.",
459+
"StageId={} index={} taskId={} attempt={} another attempt {} has reported shuffle fetch failure.",
458460
stageId,
459461
taskInfo.index(),
460462
taskId,
461463
taskInfo.attemptNumber(),
462464
ti.attemptNumber());
465+
failedTaskAttempts += 1;
463466
} else if (ti.successful()) {
464467
LOG.info(
465468
"StageId={} index={} taskId={} attempt={} another attempt {} is successful.",
@@ -477,22 +480,42 @@ public static boolean shouldReportShuffleFetchFailure(long taskId) {
477480
taskId,
478481
taskInfo.attemptNumber(),
479482
ti.attemptNumber());
480-
return false;
481-
}
482-
} else {
483-
if (ti.attemptNumber() >= maxTaskFails - 1) {
484-
LOG.warn(
485-
"StageId={} index={} taskId={} attemptNumber {} reach maxTaskFails {}.",
483+
hasRunningAttempt = true;
484+
} else if ("FAILED".equals(ti.status()) || "UNKNOWN".equals(ti.status())) {
485+
// For KILLED state task, Spark does not count the number of failures
486+
// For UNKNOWN state task, Spark does count the number of failures
487+
// For FAILED state task, Spark decides whether to count the failure based on the
488+
// different failure reasons. Since we cannot obtain the failure
489+
// reason here, we will count all tasks in FAILED state.
490+
LOG.info(
491+
"StageId={} index={} taskId={} attempt={} another attempt {} status={}.",
486492
stageId,
487493
taskInfo.index(),
488494
taskId,
495+
taskInfo.attemptNumber(),
489496
ti.attemptNumber(),
490-
maxTaskFails);
491-
return true;
497+
ti.status());
498+
failedTaskAttempts += 1;
492499
}
493500
}
494501
}
495-
return true;
502+
// The following situations should trigger a FetchFailed exception:
503+
// 1. If failedTaskAttempts >= maxTaskFails
504+
// 2. If no other taskAttempts are running
505+
if (failedTaskAttempts >= maxTaskFails || !hasRunningAttempt) {
506+
LOG.warn(
507+
"StageId={}, index={}, taskId={}, attemptNumber={}: Task failure count {} reached "
508+
+ "maximum allowed failures {} or no running attempt exists.",
509+
stageId,
510+
taskInfo.index(),
511+
taskId,
512+
taskInfo.attemptNumber(),
513+
failedTaskAttempts,
514+
maxTaskFails);
515+
return true;
516+
} else {
517+
return false;
518+
}
496519
} else {
497520
LOG.error(
498521
"Can not get TaskSetManager for taskId: {}, ignore it. (This typically occurs when: "

0 commit comments

Comments
 (0)