Skip to content

Commit 5828f41

Browse files
Hieu Huynhtgravescs
authored andcommitted
[SPARK-13343] speculative tasks that didn't commit shouldn't be marked as success
**Description** Currently Speculative tasks that didn't commit can show up as success (depending on timing of commit). This is a bit confusing because that task didn't really succeed in the sense it didn't write anything. I think these tasks should be marked as KILLED or something that is more obvious to the user exactly what happened. it is happened to hit the timing where it got a commit denied exception then it shows up as failed and counts against your task failures. It shouldn't count against task failures since that failure really doesn't matter. MapReduce handles these situation so perhaps we can look there for a model. <img width="1420" alt="unknown" src="https://user-images.githubusercontent.com/15680678/42013170-99db48c2-7a61-11e8-8c7b-ef94c84e36ea.png"> **How can this issue happen?** When both attempts of a task finish before the driver sends command to kill one of them, both of them send the status update FINISHED to the driver. The driver calls TaskSchedulerImpl to handle one successful task at a time. When it handles the first successful task, it sends the command to kill the other copy of the task, however, because that task is already finished, the executor will ignore the command. After finishing handling the first attempt, it processes the second one, although all actions on the result of this task are skipped, this copy of the task is still marked as SUCCESS. As a result, even though this issue does not affect the result of the job, it might cause confusing to user because both of them appear to be successful. **How does this PR fix the issue?** The simple way to fix this issue is that when taskSetManager handles successful task, it checks if any other attempt succeeded. If this is the case, it will call handleFailedTask with state==KILLED and reason==TaskKilled(“another attempt succeeded”) to handle this task as begin killed. **How was this patch tested?** I tested this manually by running applications, that caused the issue before, a few times, and observed that the issue does not happen again. Also, I added a unit test in TaskSetManagerSuite to test that if we call handleSuccessfulTask to handle status update for 2 copies of a task, only the one that is handled first will be mark as SUCCESS Author: Hieu Huynh <“[email protected]”> Author: hthuynh2 <[email protected]> Closes apache#21653 from hthuynh2/SPARK_13343.
1 parent ee5a5a0 commit 5828f41

File tree

2 files changed

+88
-1
lines changed

2 files changed

+88
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark._
2929
import org.apache.spark.TaskState.TaskState
3030
import org.apache.spark.internal.{config, Logging}
3131
import org.apache.spark.scheduler.SchedulingMode._
32-
import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
32+
import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils}
3333
import org.apache.spark.util.collection.MedianHeap
3434

3535
/**
@@ -728,6 +728,23 @@ private[spark] class TaskSetManager(
728728
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
729729
val info = taskInfos(tid)
730730
val index = info.index
731+
// Check if any other attempt succeeded before this and this attempt has not been handled
732+
if (successful(index) && killedByOtherAttempt.contains(tid)) {
733+
// Undo the effect on calculatedTasks and totalResultSize made earlier when
734+
// checking if can fetch more results
735+
calculatedTasks -= 1
736+
val resultSizeAcc = result.accumUpdates.find(a =>
737+
a.name == Some(InternalAccumulator.RESULT_SIZE))
738+
if (resultSizeAcc.isDefined) {
739+
totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
740+
}
741+
742+
// Handle this task as a killed task
743+
handleFailedTask(tid, TaskState.KILLED,
744+
TaskKilled("Finish but did not commit due to another attempt succeeded"))
745+
return
746+
}
747+
731748
info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
732749
if (speculationEnabled) {
733750
successfulTaskDurations.insert(info.duration)

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,4 +1532,74 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
15321532
val valueSer = SparkEnv.get.serializer.newInstance()
15331533
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
15341534
}
1535+
1536+
test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success") {
1537+
sc = new SparkContext("local", "test")
1538+
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
1539+
val taskSet = FakeTask.createTaskSet(4)
1540+
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
1541+
sc.conf.set("spark.speculation.multiplier", "0.0")
1542+
sc.conf.set("spark.speculation", "true")
1543+
val clock = new ManualClock()
1544+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
1545+
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
1546+
task.metrics.internalAccums
1547+
}
1548+
// Offer resources for 4 tasks to start
1549+
for ((k, v) <- List(
1550+
"exec1" -> "host1",
1551+
"exec1" -> "host1",
1552+
"exec2" -> "host2",
1553+
"exec2" -> "host2")) {
1554+
val taskOption = manager.resourceOffer(k, v, NO_PREF)
1555+
assert(taskOption.isDefined)
1556+
val task = taskOption.get
1557+
assert(task.executorId === k)
1558+
}
1559+
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
1560+
clock.advance(1)
1561+
// Complete the 3 tasks and leave 1 task in running
1562+
for (id <- Set(0, 1, 2)) {
1563+
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
1564+
assert(sched.endedTasks(id) === Success)
1565+
}
1566+
// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
1567+
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
1568+
// > 0ms, so advance the clock by 1ms here.
1569+
clock.advance(1)
1570+
assert(manager.checkSpeculatableTasks(0))
1571+
assert(sched.speculativeTasks.toSet === Set(3))
1572+
1573+
// Offer resource to start the speculative attempt for the running task
1574+
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
1575+
assert(taskOption5.isDefined)
1576+
val task5 = taskOption5.get
1577+
assert(task5.index === 3)
1578+
assert(task5.taskId === 4)
1579+
assert(task5.executorId === "exec1")
1580+
assert(task5.attemptNumber === 1)
1581+
sched.backend = mock(classOf[SchedulerBackend])
1582+
sched.dagScheduler.stop()
1583+
sched.dagScheduler = mock(classOf[DAGScheduler])
1584+
// Complete one attempt for the running task
1585+
val result = createTaskResult(3, accumUpdatesByTask(3))
1586+
manager.handleSuccessfulTask(3, result)
1587+
// There is a race between the scheduler asking to kill the other task, and that task
1588+
// actually finishing. We simulate what happens if the other task finishes before we kill it.
1589+
verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded")
1590+
manager.handleSuccessfulTask(4, result)
1591+
1592+
val info3 = manager.taskInfos(3)
1593+
val info4 = manager.taskInfos(4)
1594+
assert(info3.successful)
1595+
assert(info4.killed)
1596+
verify(sched.dagScheduler).taskEnded(
1597+
manager.tasks(3),
1598+
TaskKilled("Finish but did not commit due to another attempt succeeded"),
1599+
null,
1600+
Seq.empty,
1601+
info4)
1602+
verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, result.value(),
1603+
result.accumUpdates, info3)
1604+
}
15351605
}

0 commit comments

Comments
 (0)