diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala index 7c7b6d5488d81..2c834b8431c67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala @@ -181,7 +181,7 @@ object PushDownUtils { */ def pruneColumns( scanBuilder: ScanBuilder, - relation: DataSourceV2Relation, + relation: DataSourceV2RelationBase, projects: Seq[NamedExpression], filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { val exprs = projects ++ filters @@ -208,13 +208,13 @@ object PushDownUtils { // the underlying table schema scan -> toOutputAttrs(scan.readSchema(), relation) - case _ => scanBuilder.build() -> relation.output + case _ => scanBuilder.build() -> relation.output.asInstanceOf[Seq[AttributeReference]] } } def toOutputAttrs( schema: StructType, - relation: DataSourceV2Relation): Seq[AttributeReference] = { + relation: DataSourceV2RelationBase): Seq[AttributeReference] = { val nameToAttr = Utils.toMap(relation.output.map(_.name), relation.output) val cleaned = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema) toAttributes(cleaned).map { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index 31a98e1ff96cb..d17fd6ef1c94d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -62,14 +62,16 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { private def createScanBuilder(plan: LogicalPlan) = plan.transform { case r: DataSourceV2Relation => ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) + case r: StreamingDataSourceV2Relation => + StreamingScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options)) } private def pushDownFilters(plan: LogicalPlan) = plan.transform { // update the scan builder with filter push down and return a new plan with filter pushed - case Filter(condition, sHolder: ScanBuilderHolder) => + case Filter(condition, sHolder: BaseScanBuilderHolder) => val filters = splitConjunctivePredicates(condition) val normalizedFilters = - DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output) + DataSourceStrategy.normalizeExprs(filters, sHolder.output) val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) = normalizedFilters.partition(SubqueryExpression.hasSubquery) @@ -590,15 +592,19 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } def pruneColumns(plan: LogicalPlan): LogicalPlan = plan.transform { - case ScanOperation(project, filtersStayUp, filtersPushDown, sHolder: ScanBuilderHolder) => + case ScanOperation(project, filtersStayUp, filtersPushDown, sHolder: BaseScanBuilderHolder) => // column pruning val normalizedProjects = DataSourceStrategy .normalizeExprs(project, sHolder.output) .asInstanceOf[Seq[NamedExpression]] val allFilters = filtersPushDown.reduceOption(And).toSeq ++ filtersStayUp val normalizedFilters = DataSourceStrategy.normalizeExprs(allFilters, sHolder.output) - val (scan, output) = PushDownUtils.pruneColumns( - sHolder.builder, sHolder.relation, normalizedProjects, normalizedFilters) + val (scan, output) = sHolder match { + case b: ScanBuilderHolder => + PushDownUtils.pruneColumns(b.builder, b.relation, normalizedProjects, normalizedFilters) + case s: StreamingScanBuilderHolder => + PushDownUtils.pruneColumns(s.builder, s.relation, normalizedProjects, normalizedFilters) + } logInfo( log""" @@ -607,7 +613,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { val wrappedScan = getWrappedScan(scan, sHolder) - val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output) + val scanRelation: LogicalPlan = sHolder match { + case b: ScanBuilderHolder => + DataSourceV2ScanRelation(b.relation, wrappedScan, output) + case s: StreamingScanBuilderHolder => + // For streaming we keep a scan relation without stream here; planning will attach stream + StreamingDataSourceV2ScanRelation(s.relation, wrappedScan, output, null) + } val projectionOverSchema = ProjectionOverSchema(output.toStructType, AttributeSet(output)) @@ -789,7 +801,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { DataSourceStrategy.normalizeExprs(expressions, output) } - private def getWrappedScan(scan: Scan, sHolder: ScanBuilderHolder): Scan = { + private def getWrappedScan(scan: Scan, sHolder: BaseScanBuilderHolder): Scan = { scan match { case v1: V1Scan => val pushedFilters = sHolder.builder match { @@ -803,7 +815,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } } - private def getPushedDownOperators(sHolder: ScanBuilderHolder): PushedDownOperators = { + private def getPushedDownOperators(sHolder: BaseScanBuilderHolder): PushedDownOperators = { val optRelationName = Option.when(sHolder.joinedRelations.length <= 1)(sHolder.relation.name) PushedDownOperators(sHolder.pushedAggregate, sHolder.pushedSample, sHolder.pushedLimit, sHolder.pushedOffset, sHolder.sortOrders, sHolder.pushedPredicates, @@ -811,10 +823,52 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper { } } +sealed trait BaseScanBuilderHolder extends LeafNode { + var output: Seq[AttributeReference] + val relation: DataSourceV2RelationBase + val builder: ScanBuilder + + var pushedLimit: Option[Int] + var pushedOffset: Option[Int] + var sortOrders: Seq[V2SortOrder] + var pushedSample: Option[TableSampleInfo] + var pushedPredicates: Seq[Predicate] + var pushedAggregate: Option[Aggregation] + var pushedAggOutputMap: AttributeMap[Expression] + var joinedRelations: Seq[DataSourceV2RelationBase] + var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] + var pushedJoinOutputMap: AttributeMap[Expression] +} + case class ScanBuilderHolder( var output: Seq[AttributeReference], relation: DataSourceV2Relation, - builder: ScanBuilder) extends LeafNode { + builder: ScanBuilder) extends BaseScanBuilderHolder { + var pushedLimit: Option[Int] = None + + var pushedOffset: Option[Int] = None + + var sortOrders: Seq[V2SortOrder] = Seq.empty[V2SortOrder] + + var pushedSample: Option[TableSampleInfo] = None + + var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate] + + var pushedAggregate: Option[Aggregation] = None + + var pushedAggOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression] + + var joinedRelations: Seq[DataSourceV2RelationBase] = Seq(relation) + + var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] = Seq.empty[PushedDownOperators] + + var pushedJoinOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression] +} + +case class StreamingScanBuilderHolder( + var output: Seq[AttributeReference], + relation: StreamingDataSourceV2Relation, + builder: ScanBuilder) extends BaseScanBuilderHolder { var pushedLimit: Option[Int] = None var pushedOffset: Option[Int] = None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 51cd457fbc856..796880eccfad4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -72,10 +72,10 @@ class ContinuousExecution( private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null) override val logicalPlan: WriteToContinuousDataSource = { - val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]() + val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() var nextSourceId = 0 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ - val _logicalPlan = analyzedPlan.transform { + val basePlan = analyzedPlan.transform { case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, catalog, identifier, _) => val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else "" @@ -89,15 +89,26 @@ class ContinuousExecution( logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " + log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME, sourceName)}' " + log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") - // TODO: operator pushdown. - val scan = table.newScanBuilder(options).build() - val stream = scan.toContinuousStream(metadataPath) val relation = StreamingDataSourceV2Relation( table, output, catalog, identifier, options, metadataPath) - StreamingDataSourceV2ScanRelation(relation, scan, output, stream) + relation }) } + // Run V2ScanRelationPushDown here (during analysis) instead of relying on the optimizer. + // Continuous processing needs an actual V2 Scan early so we can materialize the + // ContinuousStream via scan.toContinuousStream, enumerate sources, and wire up checkpoint + // metadata paths before planning/execution. If we waited for the optimizer, a Scan might not + // yet exist at this point, which would prevent creating the stream and collecting sources + // reliably for offset tracking and recovery. + val _logicalPlan = org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown + .apply(basePlan) + .transform { + case r @ StreamingDataSourceV2ScanRelation(rel, scan, out, null, None, None) => + val stream = scan.toContinuousStream(rel.metadataPath) + r.copy(stream = stream) + } + sources = _logicalPlan.collect { case r: StreamingDataSourceV2ScanRelation => r.stream.asInstanceOf[ContinuousStream] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 174421fcf835e..e4216021ff026 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -158,7 +158,7 @@ class MicroBatchExecution( var nextSourceId = 0L val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]() - val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]() + val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() // We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a // map as we go to ensure each identical relation gets the same StreamingExecutionRelation // object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical @@ -171,7 +171,7 @@ class MicroBatchExecution( Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders) import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ - val _logicalPlan = analyzedPlan.transform { + val basePlan = analyzedPlan.transform { case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, output) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch @@ -196,12 +196,9 @@ class MicroBatchExecution( logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] " + log"from DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " + log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") - // TODO: operator pushdown. - val scan = table.newScanBuilder(options).build() - val stream = scan.toMicroBatchStream(metadataPath) val relation = StreamingDataSourceV2Relation( table, output, catalog, identifier, options, metadataPath) - StreamingDataSourceV2ScanRelation(relation, scan, output, stream) + relation }) } else if (v1.isEmpty) { throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError( @@ -222,6 +219,23 @@ class MicroBatchExecution( }) } } + // Run V2ScanRelationPushDown here (during analysis) rather than relying on the optimizer. + // Micro-batch execution needs a concrete V2 Scan early so we can materialize the + // MicroBatchStream via scan.toMicroBatchStream, enumerate sources, and wire up checkpoint + // metadata paths before planning/execution and trigger initialization. If we waited for the + // optimizer, a Scan might not yet exist here, which would prevent creating the stream and + // reliably collecting sources for offset tracking and recovery. This also applies DSv2 + // filter/column pushdown while building scans. + val pushedPlan = org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown + .apply(basePlan) + .transform { + case r @ StreamingDataSourceV2ScanRelation(rel, scan, out, null, None, None) => + val stream = scan.toMicroBatchStream(rel.metadataPath) + r.copy(stream = stream) + } + + val _logicalPlan = pushedPlan + sources = _logicalPlan.collect { // v1 source case s: StreamingExecutionRelation => s.source diff --git a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index c1fc7234d7c19..391f6f9b9ee20 100644 --- a/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -30,3 +30,7 @@ org.apache.spark.sql.streaming.sources.FakeWriteOnly org.apache.spark.sql.streaming.sources.FakeNoWrite org.apache.spark.sql.streaming.sources.FakeWriteSupportingExternalMetadata org.apache.spark.sql.streaming.sources.FakeWriteSupportProviderV1Fallback + +# Streaming pushdown test sources +org.apache.spark.sql.streaming.sources.PushdownStreamingSourceV1 +org.apache.spark.sql.streaming.sources.PushdownStreamingSourceV2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index ab49fc669ed5d..98a2537fbc100 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -24,25 +24,31 @@ import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns, SupportsPushDownV2Filters} import org.apache.spark.sql.connector.read.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, MicroBatchStream, Offset, PartitionOffset} import org.apache.spark.sql.connector.write.{LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2ScanRelation import org.apache.spark.sql.execution.streaming.ContinuousTrigger import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.execution.streaming.runtime.{RateStreamOffset, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SimpleTableProvider -import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.sources.{DataSourceRegister, Filter, StreamSinkProvider} import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} +import org.apache.spark.sql.types._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils @SlowSQLTest -class FakeDataStream extends MicroBatchStream with ContinuousStream { +class FakeDataStream( + val requiredSchema: StructType = null, + val pushedV1Filters: Array[Filter] = Array.empty, + val predicates: Array[Predicate] = Array.empty) extends MicroBatchStream with ContinuousStream { override def deserializeOffset(json: String): Offset = RateStreamOffset(Map()) override def commit(end: Offset): Unit = {} override def stop(): Unit = {} @@ -63,6 +69,234 @@ class FakeDataStream extends MicroBatchStream with ContinuousStream { } } +class PushdownStreamingScanBuilderV1 + extends ScanBuilder + with Scan + with SupportsPushDownFilters + with SupportsPushDownRequiredColumns { + + import org.apache.spark.sql.sources._ + import org.apache.spark.sql.types._ + + private val fullSchema = StructType(Seq( + StructField("i", IntegerType), + StructField("j", IntegerType))) + var requiredSchema: StructType = fullSchema + var pushedV1Filters: Array[Filter] = Array.empty + + override def pruneColumns(schema: StructType): Unit = { + requiredSchema = schema + } + + override def readSchema(): StructType = requiredSchema + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (supported, _) = filters.partition { + case GreaterThan("i", _: Int) => true + case _ => false + } + this.pushedV1Filters = supported + filters + } + + override def pushedFilters(): Array[Filter] = pushedV1Filters + + override def build(): Scan = this + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = + new FakeDataStream(requiredSchema, pushedV1Filters, Array.empty) + override def toContinuousStream(checkpointLocation: String): ContinuousStream = + new FakeDataStream(requiredSchema, pushedV1Filters, Array.empty) +} + +class PushdownStreamingScanBuilderV2 + extends ScanBuilder + with Scan + with SupportsPushDownV2Filters + with SupportsPushDownRequiredColumns { + + var requiredSchema: StructType = PushdownStreamingSourceV2.fullSchema + var predicates: Array[Predicate] = Array.empty + + override def pruneColumns(schema: StructType): Unit = { + requiredSchema = schema + } + + override def readSchema(): StructType = requiredSchema + + override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = { + val (supported, _) = predicates.partition(_.name() == "=") + this.predicates = supported + predicates + } + + override def pushedPredicates(): Array[Predicate] = predicates + + override def build(): Scan = this + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = + new FakeDataStream(requiredSchema, Array.empty, predicates) + override def toContinuousStream(checkpointLocation: String): ContinuousStream = + new FakeDataStream(requiredSchema, Array.empty, predicates) +} + +object PushdownStreamingSourceV1 { + @volatile var lastBuilder: PushdownStreamingScanBuilderV1 = _ + val fullSchema = StructType(Seq( + StructField("i", IntegerType), + StructField("j", IntegerType), + StructField("k", LongType) + )) +} + +class PushdownStreamingSourceV1 + extends DataSourceRegister + with SimpleTableProvider { + override def shortName(): String = "pushdown-streaming-v1" + override def getTable(options: CaseInsensitiveStringMap): Table = new Table with SupportsRead { + override def name(): String = "pushdown_streaming_v1" + override def schema(): StructType = PushdownStreamingSourceV1.fullSchema + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(MICRO_BATCH_READ, CONTINUOUS_READ) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + val b = new PushdownStreamingScanBuilderV1 + PushdownStreamingSourceV1.lastBuilder = b + b + } + } +} + +object PushdownStreamingSourceV2 { + @volatile var lastBuilder: PushdownStreamingScanBuilderV2 = _ + val fullSchema = StructType(Seq( + StructField("i", IntegerType), + StructField("j", IntegerType), + StructField("k", LongType) + )) +} + +class PushdownStreamingSourceV2 + extends DataSourceRegister + with SimpleTableProvider { + override def shortName(): String = "pushdown-streaming-v2" + override def getTable(options: CaseInsensitiveStringMap): Table = new Table with SupportsRead { + override def name(): String = "pushdown_streaming_v2" + override def schema(): StructType = PushdownStreamingSourceV2.fullSchema + override def capabilities(): util.Set[TableCapability] = + util.EnumSet.of(MICRO_BATCH_READ, CONTINUOUS_READ) + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { + val b = new PushdownStreamingScanBuilderV2 + PushdownStreamingSourceV2.lastBuilder = b + b + } + } +} + +class StreamingDataSourceV2PushdownSuite extends StreamTest { + + override def beforeAll(): Unit = { + super.beforeAll() + val fakeCheckpoint = Utils.createTempDir() + spark.conf.set(SQLConf.CHECKPOINT_LOCATION.key, fakeCheckpoint.getCanonicalPath) + } + + test("streaming v2 pushdown: v1 filters and column pruning (micro-batch)") { + val q = spark.readStream + .format("pushdown-streaming-v1") + .load() + .where("i > 3") + .select("j") + .writeStream + .format("fake-write-microbatch-continuous") + .trigger(Trigger.AvailableNow()) + .start() + + try { + eventually(timeout(streamingTimeout)) { + val stream = q.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect { + case r: StreamingDataSourceV2ScanRelation => r.stream + }.head.asInstanceOf[FakeDataStream] + assert(stream.requiredSchema.fieldNames.sorted.sameElements(Array("i", "j"))) + assert(stream.pushedV1Filters.map(_.toString).sameElements(Array("GreaterThan(i,3)"))) + } + } finally { + q.stop() + } + } + + test("streaming v2 pushdown: v2 predicates and column pruning (micro-batch)") { + val q = spark.readStream + .format("pushdown-streaming-v2") + .load() + .where("i = 5") + .select("j") + .writeStream + .format("fake-write-microbatch-continuous") + .trigger(Trigger.AvailableNow()) + .start() + + try { + eventually(timeout(streamingTimeout)) { + val stream = q.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect { + case r: StreamingDataSourceV2ScanRelation => r.stream + }.head.asInstanceOf[FakeDataStream] + assert(stream.requiredSchema.fieldNames.sorted.sameElements(Array("i", "j"))) + assert(stream.predicates.map(_.toString()).sameElements(Array("i = 5"))) + } + } finally { + q.stop() + } + } + + test("streaming v2 pushdown: v1 filters and column pruning (continuous)") { + val q = spark.readStream + .format("pushdown-streaming-v1") + .load() + .where("i > 100") + .select("j") + .writeStream + .format("fake-write-microbatch-continuous") + .trigger(Trigger.Continuous(1000)) + .start() + + try { + eventually(timeout(streamingTimeout)) { + val stream = q.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect { + case r: StreamingDataSourceV2ScanRelation => r.stream + }.head.asInstanceOf[FakeDataStream] + assert(stream.requiredSchema.fieldNames.sorted.sameElements(Array("i", "j"))) + assert(stream.pushedV1Filters.map(_.toString).sameElements(Array("GreaterThan(i,100)"))) + } + } finally { + q.stop() + } + } + + test("streaming v2 pushdown: v2 predicates and column pruning (continuous)") { + val q = spark.readStream + .format("pushdown-streaming-v2") + .load() + .where("i = 500") + .select("j") + .writeStream + .format("fake-write-microbatch-continuous") + .trigger(Trigger.Continuous(1000)) + .start() + + try { + eventually(timeout(streamingTimeout)) { + val stream = q.asInstanceOf[StreamingQueryWrapper].streamingQuery.logicalPlan.collect { + case r: StreamingDataSourceV2ScanRelation => r.stream + }.head.asInstanceOf[FakeDataStream] + assert(stream.requiredSchema.fieldNames.sorted.sameElements(Array("i", "j"))) + assert(stream.predicates.map(_.toString()).sameElements(Array("i = 500"))) + } + } finally { + q.stop() + } + } +} + class FakeScanBuilder extends ScanBuilder with Scan { override def build(): Scan = this override def readSchema(): StructType = StructType(Seq())