Skip to content

Commit b6b8a66

Browse files
zsxwinggatorsmile
authored andcommitted
[SPARK-25568][CORE] Continue to update the remaining accumulators when failing to update one accumulator
## What changes were proposed in this pull request? Since we don't fail a job when `AccumulatorV2.merge` fails, we should try to update the remaining accumulators so that they can still report correct values. ## How was this patch tested? The new unit test. Closes apache#22586 from zsxwing/SPARK-25568. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent f4b1380 commit b6b8a66

File tree

3 files changed

+38
-6
lines changed

3 files changed

+38
-6
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,9 +1245,10 @@ private[spark] class DAGScheduler(
12451245
private def updateAccumulators(event: CompletionEvent): Unit = {
12461246
val task = event.task
12471247
val stage = stageIdToStage(task.stageId)
1248-
try {
1249-
event.accumUpdates.foreach { updates =>
1250-
val id = updates.id
1248+
1249+
event.accumUpdates.foreach { updates =>
1250+
val id = updates.id
1251+
try {
12511252
// Find the corresponding accumulator on the driver and update it
12521253
val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
12531254
case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
@@ -1261,10 +1262,17 @@ private[spark] class DAGScheduler(
12611262
event.taskInfo.setAccumulables(
12621263
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
12631264
}
1265+
} catch {
1266+
case NonFatal(e) =>
1267+
// Log the class name to make it easy to find the bad implementation
1268+
val accumClassName = AccumulatorContext.get(id) match {
1269+
case Some(accum) => accum.getClass.getName
1270+
case None => "Unknown class"
1271+
}
1272+
logError(
1273+
s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}",
1274+
e)
12641275
}
1265-
} catch {
1266-
case NonFatal(e) =>
1267-
logError(s"Failed to update accumulators for task ${task.partitionId}", e)
12681276
}
12691277
}
12701278

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1880,6 +1880,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
18801880
assert(sc.parallelize(1 to 10, 2).count() === 10)
18811881
}
18821882

1883+
test("misbehaved accumulator should not impact other accumulators") {
1884+
val bad = new LongAccumulator {
1885+
override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = {
1886+
throw new DAGSchedulerSuiteDummyException
1887+
}
1888+
}
1889+
sc.register(bad, "bad")
1890+
val good = sc.longAccumulator("good")
1891+
1892+
sc.parallelize(1 to 10, 2).foreach { item =>
1893+
bad.add(1)
1894+
good.add(1)
1895+
}
1896+
1897+
// This is to ensure the `bad` accumulator did fail to update its value
1898+
assert(bad.value == 0L)
1899+
// Should be able to update the "good" accumulator
1900+
assert(good.value == 10L)
1901+
}
1902+
18831903
/**
18841904
* The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
18851905
* Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.

docs/rdd-programming-guide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,10 @@ jsc.sc().register(myVectorAcc, "MyVectorAcc1");
14651465

14661466
Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.
14671467

1468+
*Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator.
1469+
If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence,
1470+
a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful.
1471+
14681472
</div>
14691473

14701474
<div data-lang="python" markdown="1">

0 commit comments

Comments
 (0)