Skip to content

Commit 71bd796

Browse files
cloud-fanjiangxb1987
authored andcommitted
[SPARK-23243][CORE] Fix RDD.repartition() data correctness issue
## What changes were proposed in this pull request? An alternative fix for apache#21698 When Spark rerun tasks for an RDD, there are 3 different behaviors: 1. determinate. Always return the same result with same order when rerun. 2. unordered. Returns same data set in random order when rerun. 3. indeterminate. Returns different result when rerun. Normally Spark doesn't need to care about it. Spark runs stages one by one, when a task is failed, just rerun it. Although the rerun task may return a different result, users will not be surprised. However, Spark may rerun a finished stage when seeing fetch failures. When this happens, Spark needs to rerun all the tasks of all the succeeding stages if the RDD output is indeterminate, because the input of the succeeding stages has been changed. If the RDD output is determinate, we only need to rerun the failed tasks of the succeeding stages, because the input doesn't change. If the RDD output is unordered, it's same as determinate, because shuffle partitioner is always deterministic(round-robin partitioner is not a shuffle partitioner that extends `org.apache.spark.Partitioner`), so the reducers will still get the same input data set. This PR fixed the failure handling for `repartition`, to avoid correctness issues. For `repartition`, it applies a stateful map function to generate a round-robin id, which is order sensitive and makes the RDD's output indeterminate. When the stage contains `repartition` reruns, we must also rerun all the tasks of all the succeeding stages. **future improvement:** 1. Currently we can't rollback and rerun a shuffle map stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25341 2. Currently we can't rollback and rerun a result stage, and just fail. We should fix it later. https://issues.apache.org/jira/browse/SPARK-25342 3. We should provide public API to allow users to tag the random level of the RDD's computing function. ## How is this pull request tested? a new test case Closes apache#22112 from cloud-fan/repartition. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Xingbo Jiang <[email protected]> Signed-off-by: Xiao Li <[email protected]>
1 parent 559b899 commit 71bd796

File tree

6 files changed

+345
-17
lines changed

6 files changed

+345
-17
lines changed

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import org.apache.spark.util.random.SamplingUtils
3333
/**
3434
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
3535
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
36+
*
37+
* Note that, partitioner must be deterministic, i.e. it must return the same partition id given
38+
* the same partition key.
3639
*/
3740
abstract class Partitioner extends Serializable {
3841
def numPartitions: Int

core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@ import org.apache.spark.{Partition, TaskContext}
3232
* doesn't modify the keys.
3333
* @param isFromBarrier Indicates whether this RDD is transformed from an RDDBarrier, a stage
3434
* containing at least one RDDBarrier shall be turned into a barrier stage.
35+
* @param isOrderSensitive whether or not the function is order-sensitive. If it's order
36+
* sensitive, it may return totally different result when the input order
37+
* is changed. Mostly stateful functions are order-sensitive.
3538
*/
3639
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
3740
var prev: RDD[T],
3841
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
3942
preservesPartitioning: Boolean = false,
40-
isFromBarrier: Boolean = false)
43+
isFromBarrier: Boolean = false,
44+
isOrderSensitive: Boolean = false)
4145
extends RDD[U](prev) {
4246

4347
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
@@ -54,4 +58,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
5458

5559
@transient protected lazy override val isBarrier_ : Boolean =
5660
isFromBarrier || dependencies.exists(_.rdd.isBarrier())
61+
62+
override protected def getOutputDeterministicLevel = {
63+
if (isOrderSensitive && prev.outputDeterministicLevel == DeterministicLevel.UNORDERED) {
64+
DeterministicLevel.INDETERMINATE
65+
} else {
66+
super.getOutputDeterministicLevel
67+
}
68+
}
5769
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -462,8 +462,9 @@ abstract class RDD[T: ClassTag](
462462

463463
// include a shuffle step so that our upstream tasks are still distributed
464464
new CoalescedRDD(
465-
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
466-
new HashPartitioner(numPartitions)),
465+
new ShuffledRDD[Int, T, T](
466+
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
467+
new HashPartitioner(numPartitions)),
467468
numPartitions,
468469
partitionCoalescer).values
469470
} else {
@@ -807,16 +808,21 @@ abstract class RDD[T: ClassTag](
807808
* serializable and don't require closure cleaning.
808809
*
809810
* @param preservesPartitioning indicates whether the input function preserves the partitioner,
810-
* which should be `false` unless this is a pair RDD and the input function doesn't modify
811-
* the keys.
811+
* which should be `false` unless this is a pair RDD and the input
812+
* function doesn't modify the keys.
813+
* @param isOrderSensitive whether or not the function is order-sensitive. If it's order
814+
* sensitive, it may return totally different result when the input order
815+
* is changed. Mostly stateful functions are order-sensitive.
812816
*/
813817
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
814818
f: (Int, Iterator[T]) => Iterator[U],
815-
preservesPartitioning: Boolean = false): RDD[U] = withScope {
819+
preservesPartitioning: Boolean = false,
820+
isOrderSensitive: Boolean = false): RDD[U] = withScope {
816821
new MapPartitionsRDD(
817822
this,
818823
(context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
819-
preservesPartitioning)
824+
preservesPartitioning = preservesPartitioning,
825+
isOrderSensitive = isOrderSensitive)
820826
}
821827

822828
/**
@@ -1636,6 +1642,16 @@ abstract class RDD[T: ClassTag](
16361642
}
16371643
}
16381644

1645+
/**
1646+
* Return whether this RDD is reliably checkpointed and materialized.
1647+
*/
1648+
private[rdd] def isReliablyCheckpointed: Boolean = {
1649+
checkpointData match {
1650+
case Some(reliable: ReliableRDDCheckpointData[_]) if reliable.isCheckpointed => true
1651+
case _ => false
1652+
}
1653+
}
1654+
16391655
/**
16401656
* Gets the name of the directory to which this RDD was checkpointed.
16411657
* This is not defined if the RDD is checkpointed locally.
@@ -1873,6 +1889,63 @@ abstract class RDD[T: ClassTag](
18731889
// RDD chain.
18741890
@transient protected lazy val isBarrier_ : Boolean =
18751891
dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier())
1892+
1893+
/**
1894+
* Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]]
1895+
* for the definition.
1896+
*
1897+
* By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is DETERMINATE. For
1898+
* RDDs with parents, we will generate a deterministic level candidate per parent according to
1899+
* the dependency. The deterministic level of the current RDD is the deterministic level
1900+
* candidate that is deterministic least. Please override [[getOutputDeterministicLevel]] to
1901+
* provide custom logic of calculating output deterministic level.
1902+
*/
1903+
// TODO: make it public so users can set deterministic level to their custom RDDs.
1904+
// TODO: this can be per-partition. e.g. UnionRDD can have different deterministic level for
1905+
// different partitions.
1906+
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = {
1907+
if (isReliablyCheckpointed) {
1908+
DeterministicLevel.DETERMINATE
1909+
} else {
1910+
getOutputDeterministicLevel
1911+
}
1912+
}
1913+
1914+
@DeveloperApi
1915+
protected def getOutputDeterministicLevel: DeterministicLevel.Value = {
1916+
val deterministicLevelCandidates = dependencies.map {
1917+
// The shuffle is not really happening, treat it like narrow dependency and assume the output
1918+
// deterministic level of current RDD is same as parent.
1919+
case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) =>
1920+
dep.rdd.outputDeterministicLevel
1921+
1922+
case dep: ShuffleDependency[_, _, _] =>
1923+
if (dep.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
1924+
// If map output was indeterminate, shuffle output will be indeterminate as well
1925+
DeterministicLevel.INDETERMINATE
1926+
} else if (dep.keyOrdering.isDefined && dep.aggregator.isDefined) {
1927+
// if aggregator specified (and so unique keys) and key ordering specified - then
1928+
// consistent ordering.
1929+
DeterministicLevel.DETERMINATE
1930+
} else {
1931+
// In Spark, the reducer fetches multiple remote shuffle blocks at the same time, and
1932+
// the arrival order of these shuffle blocks are totally random. Even if the parent map
1933+
// RDD is DETERMINATE, the reduce RDD is always UNORDERED.
1934+
DeterministicLevel.UNORDERED
1935+
}
1936+
1937+
// For narrow dependency, assume the output deterministic level of current RDD is same as
1938+
// parent.
1939+
case dep => dep.rdd.outputDeterministicLevel
1940+
}
1941+
1942+
if (deterministicLevelCandidates.isEmpty) {
1943+
// By default we assume the root RDD is determinate.
1944+
DeterministicLevel.DETERMINATE
1945+
} else {
1946+
deterministicLevelCandidates.maxBy(_.id)
1947+
}
1948+
}
18761949
}
18771950

18781951

@@ -1926,3 +1999,18 @@ object RDD {
19261999
new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
19272000
}
19282001
}
2002+
2003+
/**
2004+
* The deterministic level of RDD's output (i.e. what `RDD#compute` returns). This explains how
2005+
* the output will diff when Spark reruns the tasks for the RDD. There are 3 deterministic levels:
2006+
* 1. DETERMINATE: The RDD output is always the same data set in the same order after a rerun.
2007+
* 2. UNORDERED: The RDD output is always the same data set but the order can be different
2008+
* after a rerun.
2009+
* 3. INDETERMINATE. The RDD output can be different after a rerun.
2010+
*
2011+
* Note that, the output of an RDD usually relies on the parent RDDs. When the parent RDD's output
2012+
* is INDETERMINATE, it's very likely the RDD's output is also INDETERMINATE.
2013+
*/
2014+
private[spark] object DeterministicLevel extends Enumeration {
2015+
val DETERMINATE, UNORDERED, INDETERMINATE = Value
2016+
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.internal.Logging
4040
import org.apache.spark.internal.config
4141
import org.apache.spark.network.util.JavaUtils
4242
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
43-
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
43+
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
4444
import org.apache.spark.rpc.RpcTimeout
4545
import org.apache.spark.storage._
4646
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -1487,6 +1487,63 @@ private[spark] class DAGScheduler(
14871487
failedStages += failedStage
14881488
failedStages += mapStage
14891489
if (noResubmitEnqueued) {
1490+
// If the map stage is INDETERMINATE, which means the map tasks may return
1491+
// different result when re-try, we need to re-try all the tasks of the failed
1492+
// stage and its succeeding stages, because the input data will be changed after the
1493+
// map tasks are re-tried.
1494+
// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is
1495+
// guaranteed to be determinate, so the input data of the reducers will not change
1496+
// even if the map tasks are re-tried.
1497+
if (mapStage.rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE) {
1498+
// It's a little tricky to find all the succeeding stages of `failedStage`, because
1499+
// each stage only know its parents not children. Here we traverse the stages from
1500+
// the leaf nodes (the result stages of active jobs), and rollback all the stages
1501+
// in the stage chains that connect to the `failedStage`. To speed up the stage
1502+
// traversing, we collect the stages to rollback first. If a stage needs to
1503+
// rollback, all its succeeding stages need to rollback to.
1504+
val stagesToRollback = scala.collection.mutable.HashSet(failedStage)
1505+
1506+
def collectStagesToRollback(stageChain: List[Stage]): Unit = {
1507+
if (stagesToRollback.contains(stageChain.head)) {
1508+
stageChain.drop(1).foreach(s => stagesToRollback += s)
1509+
} else {
1510+
stageChain.head.parents.foreach { s =>
1511+
collectStagesToRollback(s :: stageChain)
1512+
}
1513+
}
1514+
}
1515+
1516+
def generateErrorMessage(stage: Stage): String = {
1517+
"A shuffle map stage with indeterminate output was failed and retried. " +
1518+
s"However, Spark cannot rollback the $stage to re-process the input data, " +
1519+
"and has to fail this job. Please eliminate the indeterminacy by " +
1520+
"checkpointing the RDD before repartition and try again."
1521+
}
1522+
1523+
activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil))
1524+
1525+
stagesToRollback.foreach {
1526+
case mapStage: ShuffleMapStage =>
1527+
val numMissingPartitions = mapStage.findMissingPartitions().length
1528+
if (numMissingPartitions < mapStage.numTasks) {
1529+
// TODO: support to rollback shuffle files.
1530+
// Currently the shuffle writing is "first write wins", so we can't re-run a
1531+
// shuffle map stage and overwrite existing shuffle files. We have to finish
1532+
// SPARK-8029 first.
1533+
abortStage(mapStage, generateErrorMessage(mapStage), None)
1534+
}
1535+
1536+
case resultStage: ResultStage if resultStage.activeJob.isDefined =>
1537+
val numMissingPartitions = resultStage.findMissingPartitions().length
1538+
if (numMissingPartitions < resultStage.numTasks) {
1539+
// TODO: support to rollback result tasks.
1540+
abortStage(resultStage, generateErrorMessage(resultStage), None)
1541+
}
1542+
1543+
case _ =>
1544+
}
1545+
}
1546+
14901547
// We expect one executor failure to trigger many FetchFailures in rapid succession,
14911548
// but all of those task failures can typically be handled by a single resubmission of
14921549
// the failed stage. We avoid flooding the scheduler's event queue with resubmit

0 commit comments

Comments
 (0)