Skip to content

Commit bc11146

Browse files
mgaido91cloud-fan
authored andcommitted
[SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD
## What changes were proposed in this pull request? When a `union` is invoked on several RDDs of which one is an empty RDD, the result of the operation is a `UnionRDD`. This causes an unneeded extra-shuffle when all the other RDDs have the same partitioning. The PR ignores incoming empty RDDs in the union method. ## How was this patch tested? added UT Author: Marco Gaido <[email protected]> Closes apache#21333 from mgaido91/SPARK-23778.
1 parent bc0498d commit bc11146

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,11 +1306,12 @@ class SparkContext(config: SparkConf) extends Logging {
13061306

13071307
/** Build the union of a list of RDDs. */
13081308
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
1309-
val partitioners = rdds.flatMap(_.partitioner).toSet
1310-
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
1311-
new PartitionerAwareUnionRDD(this, rdds)
1309+
val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty)
1310+
val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSet
1311+
if (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
1312+
new PartitionerAwareUnionRDD(this, nonEmptyRdds)
13121313
} else {
1313-
new UnionRDD(this, rdds)
1314+
new UnionRDD(this, nonEmptyRdds)
13141315
}
13151316
}
13161317

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
154154
}
155155
}
156156

157+
test("SPARK-23778: empty RDD in union should not produce a UnionRDD") {
158+
val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new HashPartitioner(1))
159+
val emptyRDD = sc.emptyRDD[(Int, Boolean)]
160+
val unionRDD = sc.union(emptyRDD, rddWithPartitioner)
161+
assert(unionRDD.isInstanceOf[PartitionerAwareUnionRDD[_]])
162+
val unionAllEmptyRDD = sc.union(emptyRDD, emptyRDD)
163+
assert(unionAllEmptyRDD.isInstanceOf[UnionRDD[_]])
164+
assert(unionAllEmptyRDD.collect().isEmpty)
165+
}
166+
157167
test("partitioner aware union") {
158168
def makeRDDWithPartitioner(seq: Seq[Int]): RDD[Int] = {
159169
sc.makeRDD(seq, 1)
@@ -1047,7 +1057,9 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
10471057
private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) {
10481058
private val mutableDependencies: ArrayBuffer[Dependency[_]] = ArrayBuffer.empty
10491059
override def compute(p: Partition, c: TaskContext): Iterator[T] = Iterator.empty
1050-
override def getPartitions: Array[Partition] = Array.empty
1060+
override def getPartitions: Array[Partition] = Array(new Partition {
1061+
override def index: Int = 0
1062+
})
10511063
override def getDependencies: Seq[Dependency[_]] = mutableDependencies
10521064
def addDependency(dep: Dependency[_]) {
10531065
mutableDependencies += dep

0 commit comments

Comments
 (0)