Skip to content

Commit f32a842

Browse files
committed
[SPARK-22053][SS] Stream-stream inner join in Append Mode
## What changes were proposed in this pull request? #### Architecture This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following. 1. For each stream, we maintain the past rows as state in State Store. - For each joining key, there can be multiple rows that have been received. - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream. 2. In each batch, for each input row in each stream - Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition - Add the input row to corresponding stream’s state. - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state. Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common. - Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR` - Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL) #### Implementation The stream-stream join is primarily implemented in three classes - `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm. - `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side. - `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark. Refer to the scaladocs class for more implementation details. Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are - Allowed inner join in append mode in UnsupportedOperationChecker - Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer ## How was this patch tested? - New tests in StreamingJoinSuite - Updated tests UnsupportedOperationSuite Author: Tathagata Das <[email protected]> Closes apache#19271 from tdas/SPARK-22053.
1 parent a8a5cd2 commit f32a842

File tree

18 files changed

+1940
-45
lines changed

18 files changed

+1940
-45
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2390,7 +2390,7 @@ object TimeWindowing extends Rule[LogicalPlan] {
23902390

23912391
if (window.windowDuration == window.slideDuration) {
23922392
val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
2393-
exprId = windowAttr.exprId)
2393+
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))
23942394

23952395
val replacedPlan = p transformExpressions {
23962396
case t: TimeWindow => windowAttr

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,16 +222,17 @@ object UnsupportedOperationChecker {
222222
joinType match {
223223

224224
case _: InnerLike =>
225-
if (left.isStreaming && right.isStreaming) {
226-
throwError("Inner join between two streaming DataFrames/Datasets is not supported")
225+
if (left.isStreaming && right.isStreaming &&
226+
outputMode != InternalOutputModes.Append) {
227+
throwError("Inner join between two streaming DataFrames/Datasets is not supported" +
228+
s" in ${outputMode} output mode, only in Append output mode")
227229
}
228230

229231
case FullOuter =>
230232
if (left.isStreaming || right.isStreaming) {
231233
throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
232234
}
233235

234-
235236
case LeftOuter | LeftSemi | LeftAnti =>
236237
if (right.isStreaming) {
237238
throwError("Left outer/semi/anti joins with a streaming DataFrame/Dataset " +

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,14 @@ case class Alias(child: Expression, name: String)(
164164
}
165165
}
166166

167-
override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix"
167+
/** Used to signal the column used to calculate an eventTime watermark (e.g. a#1-T{delayMs}) */
168+
private def delaySuffix = if (metadata.contains(EventTimeWatermark.delayKey)) {
169+
s"-T${metadata.getLong(EventTimeWatermark.delayKey)}ms"
170+
} else {
171+
""
172+
}
173+
174+
override def toString: String = s"$child AS $name#${exprId.id}$typeSuffix$delaySuffix"
168175

169176
override protected final def otherCopyArgs: Seq[AnyRef] = {
170177
exprId :: qualifier :: explicitMetadata :: Nil

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,19 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
4545
case p: Union if p.children.forall(isEmptyLocalRelation) =>
4646
empty(p)
4747

48-
case p @ Join(_, _, joinType, _) if p.children.exists(isEmptyLocalRelation) => joinType match {
49-
case _: InnerLike => empty(p)
50-
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
51-
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
52-
case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
53-
case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
54-
case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
55-
case _ => p
48+
// Joins on empty LocalRelations generated from streaming sources are not eliminated
49+
// as stateful streaming joins need to perform other state management operations other than
50+
// just processing the input data.
51+
case p @ Join(_, _, joinType, _)
52+
if !p.children.exists(_.isStreaming) && p.children.exists(isEmptyLocalRelation) =>
53+
joinType match {
54+
case _: InnerLike => empty(p)
55+
// Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
56+
// Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
57+
case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
58+
case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
59+
case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
60+
case _ => p
5661
}
5762

5863
case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match {
@@ -74,6 +79,10 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
7479
//
7580
// If the grouping expressions are empty, however, then the aggregate will always produce a
7681
// single output row and thus we cannot propagate the EmptyRelation.
82+
//
83+
// Aggregation on empty LocalRelation generated from a streaming source is not eliminated
84+
// as stateful streaming aggregation need to perform other state management operations other
85+
// than just processing the input data.
7786
case Aggregate(ge, _, _) if ge.nonEmpty && !p.isStreaming => empty(p)
7887
// Generators like Hive-style UDTF may return their records within `close`.
7988
case Generate(_: Explode, _, _, _, _, _) => empty(p)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,27 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
383383
outputMode = Append
384384
)
385385

386-
// Inner joins: Stream-stream not supported
386+
// Inner joins: Multiple stream-stream joins supported only in append mode
387387
testBinaryOperationInStreamingPlan(
388-
"inner join",
388+
"single inner join in append mode",
389389
_.join(_, joinType = Inner),
390-
streamStreamSupported = false)
390+
outputMode = Append,
391+
streamStreamSupported = true)
392+
393+
testBinaryOperationInStreamingPlan(
394+
"multiple inner joins in append mode",
395+
(x: LogicalPlan, y: LogicalPlan) => {
396+
x.join(y, joinType = Inner).join(streamRelation, joinType = Inner)
397+
},
398+
outputMode = Append,
399+
streamStreamSupported = true)
400+
401+
testBinaryOperationInStreamingPlan(
402+
"inner join in update mode",
403+
_.join(_, joinType = Inner),
404+
outputMode = Update,
405+
streamStreamSupported = false,
406+
expectedMsg = "inner join")
391407

392408
// Full outer joins: only batch-batch is allowed
393409
testBinaryOperationInStreamingPlan(

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.rdd.RDD
21-
import org.apache.spark.sql.Strategy
21+
import org.apache.spark.sql.{execution, AnalysisException, Strategy}
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.encoders.RowEncoder
2424
import org.apache.spark.sql.catalyst.expressions._
2525
import org.apache.spark.sql.catalyst.planning._
2626
import org.apache.spark.sql.catalyst.plans._
2727
import org.apache.spark.sql.catalyst.plans.logical._
2828
import org.apache.spark.sql.catalyst.plans.physical._
29-
import org.apache.spark.sql.execution
3029
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
3130
import org.apache.spark.sql.execution.command._
3231
import org.apache.spark.sql.execution.exchange.ShuffleExchange
@@ -257,6 +256,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
257256
}
258257
}
259258

259+
object StreamingJoinStrategy extends Strategy {
260+
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
261+
plan match {
262+
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
263+
if left.isStreaming && right.isStreaming =>
264+
265+
new StreamingSymmetricHashJoinExec(
266+
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
267+
268+
case Join(left, right, _, _) if left.isStreaming && right.isStreaming =>
269+
throw new AnalysisException(
270+
"Stream stream joins without equality predicate is not supported", plan = Some(plan))
271+
272+
case _ => Nil
273+
}
274+
}
275+
}
276+
260277
/**
261278
* Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface.
262279
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class IncrementalExecution(
5454
sparkSession.sessionState.planner.strategies
5555

5656
override def extraPlanningStrategies: Seq[Strategy] =
57+
StreamingJoinStrategy ::
5758
StatefulAggregationStrategy ::
5859
FlatMapGroupsWithStateStrategy ::
5960
StreamingRelationStrategy ::
@@ -116,6 +117,16 @@ class IncrementalExecution(
116117
stateInfo = Some(nextStatefulOperationStateInfo),
117118
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
118119
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
120+
121+
case j: StreamingSymmetricHashJoinExec =>
122+
j.copy(
123+
stateInfo = Some(nextStatefulOperationStateInfo),
124+
eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs),
125+
stateWatermarkPredicates =
126+
StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
127+
j.left.output, j.right.output, j.leftKeys, j.rightKeys, j.condition,
128+
Some(offsetSeqMetadata.batchWatermarkMs))
129+
)
119130
}
120131
}
121132

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,8 @@ class StreamExecution(
297297
val sparkSessionToRunBatches = sparkSession.cloneSession()
298298
// Adaptive execution can change num shuffle partitions, disallow
299299
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
300+
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
301+
sparkSessionToRunBatches.conf.set(SQLConf.CBO_ENABLED.key, "false")
300302
offsetSeqMetadata = OffsetSeqMetadata(
301303
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionToRunBatches.conf)
302304

0 commit comments

Comments
 (0)