Skip to content

Commit 3990daa

Browse files
jinxingsquito
authored andcommitted
[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks
## What changes were proposed in this pull request? SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`, `markMapStageJobAsFinished` is called only in (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314); But think about below scenario: 1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0; 2. We submit stage1 by `submitMapStage`; 3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got resubmitted as stage0_1 and stage1_1; 4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but stage1 is not inside `runningStages`. So even though all splits(including the speculated tasks) in stage1 succeeded, job listener in stage1 will not be called; 5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there is no missing tasks. But in current code, job listener is not triggered. We should call the job listener for map stage in `5`. ## How was this patch tested? Not added yet. Author: jinxing <[email protected]> Closes apache#21019 from jinxing64/SPARK-23948.
1 parent ed4101d commit 3990daa

File tree

2 files changed

+70
-15
lines changed

2 files changed

+70
-15
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,17 +1092,16 @@ class DAGScheduler(
10921092
// the stage as completed here in case there are no tasks to run
10931093
markStageAsFinished(stage, None)
10941094

1095-
val debugString = stage match {
1095+
stage match {
10961096
case stage: ShuffleMapStage =>
1097-
s"Stage ${stage} is actually done; " +
1098-
s"(available: ${stage.isAvailable}," +
1099-
s"available outputs: ${stage.numAvailableOutputs}," +
1100-
s"partitions: ${stage.numPartitions})"
1097+
logDebug(s"Stage ${stage} is actually done; " +
1098+
s"(available: ${stage.isAvailable}," +
1099+
s"available outputs: ${stage.numAvailableOutputs}," +
1100+
s"partitions: ${stage.numPartitions})")
1101+
markMapStageJobsAsFinished(stage)
11011102
case stage : ResultStage =>
1102-
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
1103+
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
11031104
}
1104-
logDebug(debugString)
1105-
11061105
submitWaitingChildStages(stage)
11071106
}
11081107
}
@@ -1307,13 +1306,7 @@ class DAGScheduler(
13071306
shuffleStage.findMissingPartitions().mkString(", "))
13081307
submitStage(shuffleStage)
13091308
} else {
1310-
// Mark any map-stage jobs waiting on this stage as finished
1311-
if (shuffleStage.mapStageJobs.nonEmpty) {
1312-
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
1313-
for (job <- shuffleStage.mapStageJobs) {
1314-
markMapStageJobAsFinished(job, stats)
1315-
}
1316-
}
1309+
markMapStageJobsAsFinished(shuffleStage)
13171310
submitWaitingChildStages(shuffleStage)
13181311
}
13191312
}
@@ -1433,6 +1426,16 @@ class DAGScheduler(
14331426
}
14341427
}
14351428

1429+
private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = {
1430+
// Mark any map-stage jobs waiting on this stage as finished
1431+
if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
1432+
val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
1433+
for (job <- shuffleStage.mapStageJobs) {
1434+
markMapStageJobAsFinished(job, stats)
1435+
}
1436+
}
1437+
}
1438+
14361439
/**
14371440
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
14381441
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2146,6 +2146,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
21462146
assertDataStructuresEmpty()
21472147
}
21482148

2149+
test("Trigger mapstage's job listener in submitMissingTasks") {
2150+
val rdd1 = new MyRDD(sc, 2, Nil)
2151+
val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
2152+
val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
2153+
val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
2154+
2155+
val listener1 = new SimpleListener
2156+
val listener2 = new SimpleListener
2157+
2158+
submitMapStage(dep1, listener1)
2159+
submitMapStage(dep2, listener2)
2160+
2161+
// Complete the stage0.
2162+
assert(taskSets(0).stageId === 0)
2163+
complete(taskSets(0), Seq(
2164+
(Success, makeMapStatus("hostA", rdd1.partitions.length)),
2165+
(Success, makeMapStatus("hostB", rdd1.partitions.length))))
2166+
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
2167+
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
2168+
assert(listener1.results.size === 1)
2169+
2170+
// When attempting stage1, trigger a fetch failure.
2171+
assert(taskSets(1).stageId === 1)
2172+
complete(taskSets(1), Seq(
2173+
(Success, makeMapStatus("hostC", rdd2.partitions.length)),
2174+
(FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
2175+
scheduler.resubmitFailedStages()
2176+
// Stage1 listener should not have a result yet
2177+
assert(listener2.results.size === 0)
2178+
2179+
// Speculative task succeeded in stage1.
2180+
runEvent(makeCompletionEvent(
2181+
taskSets(1).tasks(1),
2182+
Success,
2183+
makeMapStatus("hostD", rdd2.partitions.length)))
2184+
// stage1 listener still should not have a result, though there's no missing partitions
2185+
// in it. Because stage1 has been failed and is not inside `runningStages` at this moment.
2186+
assert(listener2.results.size === 0)
2187+
2188+
// Stage0 should now be running as task set 2; make its task succeed
2189+
assert(taskSets(2).stageId === 0)
2190+
complete(taskSets(2), Seq(
2191+
(Success, makeMapStatus("hostC", rdd2.partitions.length))))
2192+
assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
2193+
Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
2194+
2195+
// After stage0 is finished, stage1 will be submitted and found there is no missing
2196+
// partitions in it. Then listener got triggered.
2197+
assert(listener2.results.size === 1)
2198+
assertDataStructuresEmpty()
2199+
}
2200+
21492201
/**
21502202
* In this test, we run a map stage where one of the executors fails but we still receive a
21512203
* "zombie" complete message from that executor. We want to make sure the stage is not reported

0 commit comments

Comments
 (0)