Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 054ddb2

Browse files
jose-torreszsxwing
authored andcommitted
[SPARK-21988] Add default stats to StreamingExecutionRelation.
## What changes were proposed in this pull request? Add default stats to StreamingExecutionRelation. ## How was this patch tested? existing unit tests and an explain() test to be sure Author: Jose Torres <[email protected]> Closes apache#19212 from joseph-torres/SPARK-21988.
1 parent ddd7f5e commit 054ddb2

File tree

5 files changed

+39
-7
lines changed

5 files changed

+39
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class StreamExecution(
166166
nextSourceId += 1
167167
// We still need to use the previous `output` instead of `source.schema` as attributes in
168168
// "df.logicalPlan" has already used attributes of the previous `output`.
169-
StreamingExecutionRelation(source, output)
169+
StreamingExecutionRelation(source, output)(sparkSession)
170170
})
171171
}
172172
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
package org.apache.spark.sql.execution.streaming
1919

2020
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.sql.SparkSession
2122
import org.apache.spark.sql.catalyst.InternalRow
2223
import org.apache.spark.sql.catalyst.expressions.Attribute
2324
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
25+
import org.apache.spark.sql.catalyst.plans.logical.Statistics
2426
import org.apache.spark.sql.execution.LeafExecNode
2527
import org.apache.spark.sql.execution.datasources.DataSource
2628

@@ -48,9 +50,21 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
4850
* Used to link a streaming [[Source]] of data into a
4951
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
5052
*/
51-
case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
53+
case class StreamingExecutionRelation(
54+
source: Source,
55+
output: Seq[Attribute])(session: SparkSession)
56+
extends LeafNode {
57+
5258
override def isStreaming: Boolean = true
5359
override def toString: String = source.toString
60+
61+
// There's no sensible value here. On the execution path, this relation will be
62+
// swapped out with microbatches. But some dataframe operations (in particular explain) do lead
63+
// to this node surviving analysis. So we satisfy the LeafNode contract with the session default
64+
// value.
65+
override def computeStats(): Statistics = Statistics(
66+
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
67+
)
5468
}
5569

5670
/**
@@ -65,7 +79,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext
6579
}
6680

6781
object StreamingExecutionRelation {
68-
def apply(source: Source): StreamingExecutionRelation = {
69-
StreamingExecutionRelation(source, source.schema.toAttributes)
82+
def apply(source: Source, session: SparkSession): StreamingExecutionRelation = {
83+
StreamingExecutionRelation(source, source.schema.toAttributes)(session)
7084
}
7185
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ object MemoryStream {
5353
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
5454
extends Source with Logging {
5555
protected val encoder = encoderFor[A]
56-
protected val logicalPlan = StreamingExecutionRelation(this)
56+
protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession)
5757
protected val output = logicalPlan.output
5858

5959
/**

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@ class StreamSuite extends StreamTest {
7676
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
7777
}
7878

79+
80+
test("explain join") {
81+
// Make a table and ensure it will be broadcast.
82+
val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
83+
84+
// Join the input stream with a table.
85+
val inputData = MemoryStream[Int]
86+
val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value")
87+
88+
val outputStream = new java.io.ByteArrayOutputStream()
89+
Console.withOut(outputStream) {
90+
joined.explain()
91+
}
92+
assert(outputStream.toString.contains("StreamingRelation"))
93+
}
94+
7995
test("SPARK-20432: union one stream with itself") {
8096
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
8197
val unioned = df.union(df)
@@ -337,7 +353,9 @@ class StreamSuite extends StreamTest {
337353

338354
override def stop(): Unit = {}
339355
}
340-
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
356+
val df = Dataset[Int](
357+
sqlContext.sparkSession,
358+
StreamingExecutionRelation(source, sqlContext.sparkSession))
341359
testStream(df)(
342360
// `ExpectFailure(isFatalError = true)` verifies two things:
343361
// - Fatal errors can be propagated to `StreamingQuery.exception` and

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
653653
}
654654
override def stop(): Unit = {}
655655
}
656-
StreamingExecutionRelation(source)
656+
StreamingExecutionRelation(source, spark)
657657
}
658658

659659
/** Returns the query progress at the end of the first trigger of streaming DF */

0 commit comments

Comments
 (0)