Skip to content

Commit 10c72fa

Browse files
committed
fix: update MemoryStream initialization to use 'spark' instead of 'sqlContext' in StreamingQueryListenerSuite
1 parent 243f49e commit 10c72fa

File tree

1 file changed

+5
-8
lines changed

1 file changed

+5
-8
lines changed

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
624624
/** Collects events from the StreamingQueryListener for testing */
625625
testQuietly("QueryExecutionStart event fires only when batch will execute") {
626626
val clock = new StreamManualClock
627-
val inputData = new MemoryStream[Int](0, sqlContext)
627+
val inputData = new MemoryStream[Int](0, spark)
628628
val df = inputData.toDS().as[Long].map { 10 / _ }
629629
val listener = new EventCollectorV3
630630

@@ -663,7 +663,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
663663

664664
testQuietly("QueryExecutionStart event disabled by default") {
665665
val clock = new StreamManualClock
666-
val inputData = new MemoryStream[Int](0, sqlContext)
666+
val inputData = new MemoryStream[Int](0, spark)
667667
val df = inputData.toDS().as[Long].map { _ + 1 }
668668
val listener = new EventCollectorV3
669669

@@ -698,7 +698,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
698698

699699
testQuietly("QueryExecutionStart event respects throttling interval") {
700700
val clock = new StreamManualClock
701-
val inputData = new MemoryStream[Int](0, sqlContext)
701+
val inputData = new MemoryStream[Int](0, spark)
702702
val df = inputData.toDS().as[Long].map { _ + 1 }
703703
val listener = new EventCollectorV3
704704

@@ -756,7 +756,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
756756

757757
testQuietly("QueryExecutionStart event fires before QueryProgress") {
758758
val clock = new StreamManualClock
759-
val inputData = new MemoryStream[Int](0, sqlContext)
759+
val inputData = new MemoryStream[Int](0, spark)
760760
val df = inputData.toDS().as[Long].map { _ + 1 }
761761
val listener = new EventCollectorV3
762762

@@ -789,7 +789,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
789789

790790
testQuietly("QueryExecutionStart event contains correct timestamp") {
791791
val clock = new StreamManualClock
792-
val inputData = new MemoryStream[Int](0, sqlContext)
792+
val inputData = new MemoryStream[Int](0, spark)
793793
val df = inputData.toDS().as[Long].map { _ + 1 }
794794
val listener = new EventCollectorV3
795795

@@ -933,9 +933,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
933933

934934
override def onQueryProgress(event: QueryProgressEvent): Unit = handleOnQueryProgress(event)
935935

936-
override def onQueryExecutionStart(event: QueryExecutionStartEvent): Unit =
937-
handleOnQueryExecutionStart(event)
938-
939936
override def onQueryIdle(event: QueryIdleEvent): Unit = handleOnQueryIdle(event)
940937

941938
override def onQueryTerminated(event: QueryTerminatedEvent): Unit =

0 commit comments

Comments
 (0)