Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 59daf91

Browse files
jose-torreszsxwing
authored andcommitted
[SPARK-22733] Split StreamExecution into MicroBatchExecution and StreamExecution.
## What changes were proposed in this pull request? StreamExecution is now an abstract base class, which MicroBatchExecution (the current StreamExecution) inherits. When continuous processing is implemented, we'll have a new ContinuousExecution implementation of StreamExecution. A few fields are also renamed to make them less microbatch-specific. ## How was this patch tested? refactoring only Author: Jose Torres <[email protected]> Closes apache#19926 from joseph-torres/continuous-refactor.
1 parent 2fe1633 commit 59daf91

File tree

10 files changed

+484
-429
lines changed

10 files changed

+484
-429
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,18 @@ import org.apache.spark.sql.SparkSession
4242
* line 1: version
4343
* line 2: metadata (optional json string)
4444
*/
45-
class BatchCommitLog(sparkSession: SparkSession, path: String)
45+
class CommitLog(sparkSession: SparkSession, path: String)
4646
extends HDFSMetadataLog[String](sparkSession, path) {
4747

48-
import BatchCommitLog._
48+
import CommitLog._
4949

5050
def add(batchId: Long): Unit = {
5151
super.add(batchId, EMPTY_JSON)
5252
}
5353

5454
override def add(batchId: Long, metadata: String): Boolean = {
5555
throw new UnsupportedOperationException(
56-
"BatchCommitLog does not take any metadata, use 'add(batchId)' instead")
56+
"CommitLog does not take any metadata, use 'add(batchId)' instead")
5757
}
5858

5959
override protected def deserialize(in: InputStream): String = {
@@ -76,7 +76,7 @@ class BatchCommitLog(sparkSession: SparkSession, path: String)
7676
}
7777
}
7878

79-
object BatchCommitLog {
79+
object CommitLog {
8080
private val VERSION = 1
8181
private val EMPTY_JSON = "{}"
8282
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 407 additions & 0 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 52 additions & 405 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
237237
"is not supported in streaming DataFrames/Datasets and will be disabled.")
238238
}
239239

240-
new StreamingQueryWrapper(new StreamExecution(
240+
new StreamingQueryWrapper(new MicroBatchExecution(
241241
sparkSession,
242242
userSpecifiedName.orNull,
243243
checkpointLocation,

sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
260260
CheckLastBatch((10, 5)),
261261
StopStream,
262262
AssertOnQuery { q => // purge commit and clear the sink
263-
val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
264-
q.batchCommitLog.purge(commit)
263+
val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
264+
q.commitLog.purge(commit)
265265
q.sink.asInstanceOf[MemorySink].clear()
266266
true
267267
},

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
10241024
expectedCompactInterval: Int): Boolean = {
10251025
import CompactibleFileStreamLog._
10261026

1027-
val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
1027+
val fileSource = getSourcesFromStreamingQuery(execution).head
10281028
val metadataLog = fileSource invokePrivate _metadataLog()
10291029

10301030
if (isCompactionBatch(batchId, expectedCompactInterval)) {
@@ -1100,8 +1100,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
11001100
CheckAnswer("keep1", "keep2", "keep3"),
11011101
AssertOnQuery("check getBatch") { execution: StreamExecution =>
11021102
val _sources = PrivateMethod[Seq[Source]]('sources)
1103-
val fileSource =
1104-
(execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
1103+
val fileSource = getSourcesFromStreamingQuery(execution).head
11051104

11061105
def verify(startId: Option[Int], endId: Int, expected: String*): Unit = {
11071106
val start = startId.map(new FileStreamSourceOffset(_))

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ class StreamSuite extends StreamTest {
276276

277277
// Check the latest batchid in the commit log
278278
def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
279-
AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId,
279+
AssertOnQuery(_.commitLog.getLatest().get._1 == expectedId,
280280
s"commitLog's latest should be $expectedId")
281281

282282
// Ensure that there has not been an incremental execution after restart

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -300,12 +300,14 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
300300
if (currentStream != null) currentStream.committedOffsets.toString else "not started"
301301

302302
def threadState =
303-
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
304-
def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
305-
s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
306-
} else {
307-
""
308-
}
303+
if (currentStream != null && currentStream.queryExecutionThread.isAlive) "alive" else "dead"
304+
305+
def threadStackTrace =
306+
if (currentStream != null && currentStream.queryExecutionThread.isAlive) {
307+
s"Thread stack trace: ${currentStream.queryExecutionThread.getStackTrace.mkString("\n")}"
308+
} else {
309+
""
310+
}
309311

310312
def testState =
311313
s"""
@@ -460,7 +462,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
460462
verify(currentStream != null, "can not stop a stream that is not running")
461463
try failAfter(streamingTimeout) {
462464
currentStream.stop()
463-
verify(!currentStream.microBatchThread.isAlive,
465+
verify(!currentStream.queryExecutionThread.isAlive,
464466
s"microbatch thread not stopped")
465467
verify(!currentStream.isActive,
466468
"query.isActive() is false even after stopping")
@@ -486,7 +488,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
486488
currentStream.awaitTermination()
487489
}
488490
eventually("microbatch thread not stopped after termination with failure") {
489-
assert(!currentStream.microBatchThread.isAlive)
491+
assert(!currentStream.queryExecutionThread.isAlive)
490492
}
491493
verify(currentStream.exception === Some(thrownException),
492494
s"incorrect exception returned by query.exception()")
@@ -614,7 +616,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
614616
case e: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
615617
failTest("Timed out waiting for stream", e)
616618
} finally {
617-
if (currentStream != null && currentStream.microBatchThread.isAlive) {
619+
if (currentStream != null && currentStream.queryExecutionThread.isAlive) {
618620
currentStream.stop()
619621
}
620622

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
300300
StopStream,
301301
AssertOnQuery { q => // clear the sink
302302
q.sink.asInstanceOf[MemorySink].clear()
303-
q.batchCommitLog.purge(3)
303+
q.commitLog.purge(3)
304304
// advance by a minute i.e., 90 seconds total
305305
clock.advance(60 * 1000L)
306306
true
@@ -352,7 +352,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest
352352
StopStream,
353353
AssertOnQuery { q => // clear the sink
354354
q.sink.asInstanceOf[MemorySink].clear()
355-
q.batchCommitLog.purge(3)
355+
q.commitLog.purge(3)
356356
// advance by 60 days i.e., 90 days total
357357
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
358358
true

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
173173
StopStream, // clears out StreamTest state
174174
AssertOnQuery { q =>
175175
// both commit log and offset log contain the same (latest) batch id
176-
q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) ==
176+
q.commitLog.getLatest().map(_._1).getOrElse(-1L) ==
177177
q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
178178
},
179179
AssertOnQuery { q =>
180180
// blow away commit log and sink result
181-
q.batchCommitLog.purge(1)
181+
q.commitLog.purge(1)
182182
q.sink.asInstanceOf[MemorySink].clear()
183183
true
184184
},

0 commit comments

Comments
 (0)