Skip to content

Commit dc8a6be

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-24588][SS] streaming join should require HashClusteredPartitioning from children
## What changes were proposed in this pull request? In apache#19080 we simplified the distribution/partitioning framework, and make all the join-like operators require `HashClusteredDistribution` from children. Unfortunately streaming join operator was missed. This can cause wrong result. Think about ``` val input1 = MemoryStream[Int] val input2 = MemoryStream[Int] val df1 = input1.toDF.select('value as 'a, 'value * 2 as 'b) val df2 = input2.toDF.select('value as 'a, 'value * 2 as 'b).repartition('b) val joined = df1.join(df2, Seq("a", "b")).select('a) ``` The physical plan is ``` *(3) Project [a#5] +- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(a#5, b#6, 5) : +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6] : +- StreamingRelation MemoryStream[value#1], [value#1] +- Exchange hashpartitioning(b#11, 5) +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11] +- StreamingRelation MemoryStream[value#3], [value#3] ``` The left table is hash partitioned by `a, b`, while the right table is hash partitioned by `b`. This means, we may have a matching record that is in different partitions, which should be in the output but not. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes apache#21587 from cloud-fan/join.
1 parent b9a6f74 commit dc8a6be

File tree

5 files changed

+217
-59
lines changed

5 files changed

+217
-59
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,19 @@ case class ClusteredDistribution(
9999
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
100100
* number of partitions, this distribution strictly requires which partition the tuple should be in.
101101
*/
102-
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution {
102+
case class HashClusteredDistribution(
103+
expressions: Seq[Expression],
104+
requiredNumPartitions: Option[Int] = None) extends Distribution {
103105
require(
104106
expressions != Nil,
105-
"The expressions for hash of a HashPartitionedDistribution should not be Nil. " +
107+
"The expressions for hash of a HashClusteredDistribution should not be Nil. " +
106108
"An AllTuples should be used to represent a distribution that only has " +
107109
"a single partition.")
108110

109-
override def requiredNumPartitions: Option[Int] = None
110-
111111
override def createPartitioning(numPartitions: Int): Partitioning = {
112+
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
113+
s"This HashClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " +
114+
s"the actual number of partitions is $numPartitions.")
112115
HashPartitioning(expressions, numPartitions)
113116
}
114117
}
@@ -163,11 +166,22 @@ trait Partitioning {
163166
* i.e. the current dataset does not need to be re-partitioned for the `required`
164167
* Distribution (it is possible that tuples within a partition need to be reorganized).
165168
*
169+
* A [[Partitioning]] can never satisfy a [[Distribution]] if its `numPartitions` does't match
170+
* [[Distribution.requiredNumPartitions]].
171+
*/
172+
final def satisfies(required: Distribution): Boolean = {
173+
required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
174+
}
175+
176+
/**
177+
* The actual method that defines whether this [[Partitioning]] can satisfy the given
178+
* [[Distribution]], after the `numPartitions` check.
179+
*
166180
* By default a [[Partitioning]] can satisfy [[UnspecifiedDistribution]], and [[AllTuples]] if
167-
* the [[Partitioning]] only have one partition. Implementations can overwrite this method with
168-
* special logic.
181+
* the [[Partitioning]] only have one partition. Implementations can also overwrite this method
182+
* with special logic.
169183
*/
170-
def satisfies(required: Distribution): Boolean = required match {
184+
protected def satisfies0(required: Distribution): Boolean = required match {
171185
case UnspecifiedDistribution => true
172186
case AllTuples => numPartitions == 1
173187
case _ => false
@@ -186,9 +200,8 @@ case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning
186200
case object SinglePartition extends Partitioning {
187201
val numPartitions = 1
188202

189-
override def satisfies(required: Distribution): Boolean = required match {
203+
override def satisfies0(required: Distribution): Boolean = required match {
190204
case _: BroadcastDistribution => false
191-
case ClusteredDistribution(_, Some(requiredNumPartitions)) => requiredNumPartitions == 1
192205
case _ => true
193206
}
194207
}
@@ -205,16 +218,15 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
205218
override def nullable: Boolean = false
206219
override def dataType: DataType = IntegerType
207220

208-
override def satisfies(required: Distribution): Boolean = {
209-
super.satisfies(required) || {
221+
override def satisfies0(required: Distribution): Boolean = {
222+
super.satisfies0(required) || {
210223
required match {
211224
case h: HashClusteredDistribution =>
212225
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
213226
case (l, r) => l.semanticEquals(r)
214227
}
215-
case ClusteredDistribution(requiredClustering, requiredNumPartitions) =>
216-
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) &&
217-
(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions)
228+
case ClusteredDistribution(requiredClustering, _) =>
229+
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
218230
case _ => false
219231
}
220232
}
@@ -246,15 +258,14 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
246258
override def nullable: Boolean = false
247259
override def dataType: DataType = IntegerType
248260

249-
override def satisfies(required: Distribution): Boolean = {
250-
super.satisfies(required) || {
261+
override def satisfies0(required: Distribution): Boolean = {
262+
super.satisfies0(required) || {
251263
required match {
252264
case OrderedDistribution(requiredOrdering) =>
253265
val minSize = Seq(requiredOrdering.size, ordering.size).min
254266
requiredOrdering.take(minSize) == ordering.take(minSize)
255-
case ClusteredDistribution(requiredClustering, requiredNumPartitions) =>
256-
ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) &&
257-
(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions)
267+
case ClusteredDistribution(requiredClustering, _) =>
268+
ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x)))
258269
case _ => false
259270
}
260271
}
@@ -295,7 +306,7 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
295306
* Returns true if any `partitioning` of this collection satisfies the given
296307
* [[Distribution]].
297308
*/
298-
override def satisfies(required: Distribution): Boolean =
309+
override def satisfies0(required: Distribution): Boolean =
299310
partitionings.exists(_.satisfies(required))
300311

301312
override def toString: String = {
@@ -310,7 +321,7 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
310321
case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
311322
override val numPartitions: Int = 1
312323

313-
override def satisfies(required: Distribution): Boolean = required match {
324+
override def satisfies0(required: Distribution): Boolean = required match {
314325
case BroadcastDistribution(m) if m == mode => true
315326
case _ => false
316327
}

0 commit comments

Comments
 (0)