@@ -27,6 +27,7 @@ import scala.util.control.ControlThrowable
27
27
import com .google .common .util .concurrent .UncheckedExecutionException
28
28
import org .apache .commons .io .FileUtils
29
29
import org .apache .hadoop .conf .Configuration
30
+ import org .scalatest .time .SpanSugar ._
30
31
31
32
import org .apache .spark .{SparkConf , SparkContext }
32
33
import org .apache .spark .scheduler .{SparkListener , SparkListenerJobStart }
@@ -35,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Range
35
36
import org .apache .spark .sql .catalyst .streaming .InternalOutputModes
36
37
import org .apache .spark .sql .execution .command .ExplainCommand
37
38
import org .apache .spark .sql .execution .streaming ._
39
+ import org .apache .spark .sql .execution .streaming .sources .ContinuousMemoryStream
38
40
import org .apache .spark .sql .execution .streaming .state .{StateStore , StateStoreConf , StateStoreId , StateStoreProvider }
39
41
import org .apache .spark .sql .functions ._
40
42
import org .apache .spark .sql .internal .SQLConf
@@ -513,6 +515,120 @@ class StreamSuite extends StreamTest {
513
515
}
514
516
}
515
517
518
+ test(" explain-continuous" ) {
519
+ val inputData = ContinuousMemoryStream [Int ]
520
+ val df = inputData.toDS().map(_ * 2 ).filter(_ > 5 )
521
+
522
+ // Test `df.explain`
523
+ val explain = ExplainCommand (df.queryExecution.logical, extended = false )
524
+ val explainString =
525
+ spark.sessionState
526
+ .executePlan(explain)
527
+ .executedPlan
528
+ .executeCollect()
529
+ .map(_.getString(0 ))
530
+ .mkString(" \n " )
531
+ assert(explainString.contains(" Filter" ))
532
+ assert(explainString.contains(" MapElements" ))
533
+ assert(! explainString.contains(" LocalTableScan" ))
534
+
535
+ // Test StreamingQuery.display
536
+ val q = df.writeStream.queryName(" memory_continuous_explain" )
537
+ .outputMode(OutputMode .Update ()).format(" memory" )
538
+ .trigger(Trigger .Continuous (" 1 seconds" ))
539
+ .start()
540
+ .asInstanceOf [StreamingQueryWrapper ]
541
+ .streamingQuery
542
+ try {
543
+ // in continuous mode, the query will be run even there's no data
544
+ // sleep a bit to ensure initialization
545
+ eventually(timeout(2 .seconds), interval(100 .milliseconds)) {
546
+ assert(q.lastExecution != null )
547
+ }
548
+
549
+ val explainWithoutExtended = q.explainInternal(false )
550
+
551
+ // `extended = false` only displays the physical plan.
552
+ assert(" Streaming RelationV2 ContinuousMemoryStream" .r
553
+ .findAllMatchIn(explainWithoutExtended).size === 0 )
554
+ assert(" ScanV2 ContinuousMemoryStream" .r
555
+ .findAllMatchIn(explainWithoutExtended).size === 1 )
556
+
557
+ val explainWithExtended = q.explainInternal(true )
558
+ // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
559
+ // plan.
560
+ assert(" Streaming RelationV2 ContinuousMemoryStream" .r
561
+ .findAllMatchIn(explainWithExtended).size === 3 )
562
+ assert(" ScanV2 ContinuousMemoryStream" .r
563
+ .findAllMatchIn(explainWithExtended).size === 1 )
564
+ } finally {
565
+ q.stop()
566
+ }
567
+ }
568
+
569
+ test(" codegen-microbatch" ) {
570
+ val inputData = MemoryStream [Int ]
571
+ val df = inputData.toDS().map(_ * 2 ).filter(_ > 5 )
572
+
573
+ // Test StreamingQuery.codegen
574
+ val q = df.writeStream.queryName(" memory_microbatch_codegen" )
575
+ .outputMode(OutputMode .Update )
576
+ .format(" memory" )
577
+ .trigger(Trigger .ProcessingTime (" 1 seconds" ))
578
+ .start()
579
+
580
+ try {
581
+ import org .apache .spark .sql .execution .debug ._
582
+ assert(" No physical plan. Waiting for data." === codegenString(q))
583
+ assert(codegenStringSeq(q).isEmpty)
584
+
585
+ inputData.addData(1 , 2 , 3 , 4 , 5 )
586
+ q.processAllAvailable()
587
+
588
+ assertDebugCodegenResult(q)
589
+ } finally {
590
+ q.stop()
591
+ }
592
+ }
593
+
594
+ test(" codegen-continuous" ) {
595
+ val inputData = ContinuousMemoryStream [Int ]
596
+ val df = inputData.toDS().map(_ * 2 ).filter(_ > 5 )
597
+
598
+ // Test StreamingQuery.codegen
599
+ val q = df.writeStream.queryName(" memory_continuous_codegen" )
600
+ .outputMode(OutputMode .Update )
601
+ .format(" memory" )
602
+ .trigger(Trigger .Continuous (" 1 seconds" ))
603
+ .start()
604
+
605
+ try {
606
+ // in continuous mode, the query will be run even there's no data
607
+ // sleep a bit to ensure initialization
608
+ eventually(timeout(2 .seconds), interval(100 .milliseconds)) {
609
+ assert(q.asInstanceOf [StreamingQueryWrapper ].streamingQuery.lastExecution != null )
610
+ }
611
+
612
+ assertDebugCodegenResult(q)
613
+ } finally {
614
+ q.stop()
615
+ }
616
+ }
617
+
618
+ private def assertDebugCodegenResult (query : StreamingQuery ): Unit = {
619
+ import org .apache .spark .sql .execution .debug ._
620
+
621
+ val codegenStr = codegenString(query)
622
+ assert(codegenStr.contains(" Found 1 WholeStageCodegen subtrees." ))
623
+ // assuming that code is generated for the test query
624
+ assert(codegenStr.contains(" Generated code:" ))
625
+
626
+ val codegenStrSeq = codegenStringSeq(query)
627
+ assert(codegenStrSeq.nonEmpty)
628
+ assert(codegenStrSeq.head._1.contains(" *(1)" ))
629
+ assert(codegenStrSeq.head._2.contains(" codegenStageId=1" ))
630
+ }
631
+
516
632
test(" SPARK-19065: dropDuplicates should not create expressions using the same id" ) {
517
633
withTempPath { testPath =>
518
634
val data = Seq ((1 , 2 ), (2 , 3 ), (3 , 4 ))
0 commit comments