Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ object PushDownUtils {
*/
def pruneColumns(
scanBuilder: ScanBuilder,
relation: DataSourceV2Relation,
relation: DataSourceV2RelationBase,
projects: Seq[NamedExpression],
filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
val exprs = projects ++ filters
Expand All @@ -208,13 +208,13 @@ object PushDownUtils {
// the underlying table schema
scan -> toOutputAttrs(scan.readSchema(), relation)

case _ => scanBuilder.build() -> relation.output
case _ => scanBuilder.build() -> relation.output.asInstanceOf[Seq[AttributeReference]]
}
}

def toOutputAttrs(
schema: StructType,
relation: DataSourceV2Relation): Seq[AttributeReference] = {
relation: DataSourceV2RelationBase): Seq[AttributeReference] = {
val nameToAttr = Utils.toMap(relation.output.map(_.name), relation.output)
val cleaned = CharVarcharUtils.replaceCharVarcharWithStringInSchema(schema)
toAttributes(cleaned).map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
private def createScanBuilder(plan: LogicalPlan) = plan.transform {
case r: DataSourceV2Relation =>
ScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options))
case r: StreamingDataSourceV2Relation =>
StreamingScanBuilderHolder(r.output, r, r.table.asReadable.newScanBuilder(r.options))
}

private def pushDownFilters(plan: LogicalPlan) = plan.transform {
// update the scan builder with filter push down and return a new plan with filter pushed
case Filter(condition, sHolder: ScanBuilderHolder) =>
case Filter(condition, sHolder: BaseScanBuilderHolder) =>
val filters = splitConjunctivePredicates(condition)
val normalizedFilters =
DataSourceStrategy.normalizeExprs(filters, sHolder.relation.output)
DataSourceStrategy.normalizeExprs(filters, sHolder.output)
val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) =
normalizedFilters.partition(SubqueryExpression.hasSubquery)

Expand Down Expand Up @@ -590,15 +592,19 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
}

def pruneColumns(plan: LogicalPlan): LogicalPlan = plan.transform {
case ScanOperation(project, filtersStayUp, filtersPushDown, sHolder: ScanBuilderHolder) =>
case ScanOperation(project, filtersStayUp, filtersPushDown, sHolder: BaseScanBuilderHolder) =>
// column pruning
val normalizedProjects = DataSourceStrategy
.normalizeExprs(project, sHolder.output)
.asInstanceOf[Seq[NamedExpression]]
val allFilters = filtersPushDown.reduceOption(And).toSeq ++ filtersStayUp
val normalizedFilters = DataSourceStrategy.normalizeExprs(allFilters, sHolder.output)
val (scan, output) = PushDownUtils.pruneColumns(
sHolder.builder, sHolder.relation, normalizedProjects, normalizedFilters)
val (scan, output) = sHolder match {
case b: ScanBuilderHolder =>
PushDownUtils.pruneColumns(b.builder, b.relation, normalizedProjects, normalizedFilters)
case s: StreamingScanBuilderHolder =>
PushDownUtils.pruneColumns(s.builder, s.relation, normalizedProjects, normalizedFilters)
}

logInfo(
log"""
Expand All @@ -607,7 +613,13 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {

val wrappedScan = getWrappedScan(scan, sHolder)

val scanRelation = DataSourceV2ScanRelation(sHolder.relation, wrappedScan, output)
val scanRelation: LogicalPlan = sHolder match {
case b: ScanBuilderHolder =>
DataSourceV2ScanRelation(b.relation, wrappedScan, output)
case s: StreamingScanBuilderHolder =>
// For streaming we keep a scan relation without stream here; planning will attach stream
StreamingDataSourceV2ScanRelation(s.relation, wrappedScan, output, null)
}

val projectionOverSchema =
ProjectionOverSchema(output.toStructType, AttributeSet(output))
Expand Down Expand Up @@ -789,7 +801,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
DataSourceStrategy.normalizeExprs(expressions, output)
}

private def getWrappedScan(scan: Scan, sHolder: ScanBuilderHolder): Scan = {
private def getWrappedScan(scan: Scan, sHolder: BaseScanBuilderHolder): Scan = {
scan match {
case v1: V1Scan =>
val pushedFilters = sHolder.builder match {
Expand All @@ -803,18 +815,60 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
}
}

private def getPushedDownOperators(sHolder: ScanBuilderHolder): PushedDownOperators = {
private def getPushedDownOperators(sHolder: BaseScanBuilderHolder): PushedDownOperators = {
val optRelationName = Option.when(sHolder.joinedRelations.length <= 1)(sHolder.relation.name)
PushedDownOperators(sHolder.pushedAggregate, sHolder.pushedSample,
sHolder.pushedLimit, sHolder.pushedOffset, sHolder.sortOrders, sHolder.pushedPredicates,
sHolder.joinedRelationsPushedDownOperators, optRelationName)
}
}

sealed trait BaseScanBuilderHolder extends LeafNode {
var output: Seq[AttributeReference]
val relation: DataSourceV2RelationBase
val builder: ScanBuilder

var pushedLimit: Option[Int]
var pushedOffset: Option[Int]
var sortOrders: Seq[V2SortOrder]
var pushedSample: Option[TableSampleInfo]
var pushedPredicates: Seq[Predicate]
var pushedAggregate: Option[Aggregation]
var pushedAggOutputMap: AttributeMap[Expression]
var joinedRelations: Seq[DataSourceV2RelationBase]
var joinedRelationsPushedDownOperators: Seq[PushedDownOperators]
var pushedJoinOutputMap: AttributeMap[Expression]
}

case class ScanBuilderHolder(
var output: Seq[AttributeReference],
relation: DataSourceV2Relation,
builder: ScanBuilder) extends LeafNode {
builder: ScanBuilder) extends BaseScanBuilderHolder {
var pushedLimit: Option[Int] = None

var pushedOffset: Option[Int] = None

var sortOrders: Seq[V2SortOrder] = Seq.empty[V2SortOrder]

var pushedSample: Option[TableSampleInfo] = None

var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate]

var pushedAggregate: Option[Aggregation] = None

var pushedAggOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression]

var joinedRelations: Seq[DataSourceV2RelationBase] = Seq(relation)

var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] = Seq.empty[PushedDownOperators]

var pushedJoinOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression]
}

case class StreamingScanBuilderHolder(
var output: Seq[AttributeReference],
relation: StreamingDataSourceV2Relation,
builder: ScanBuilder) extends BaseScanBuilderHolder {
var pushedLimit: Option[Int] = None

var pushedOffset: Option[Int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ class ContinuousExecution(
private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null)

override val logicalPlan: WriteToContinuousDataSource = {
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]()
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
var nextSourceId = 0
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
val basePlan = analyzedPlan.transform {
case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output,
catalog, identifier, _) =>
val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else ""
Expand All @@ -89,15 +89,26 @@ class ContinuousExecution(
logInfo(log"Reading table [${MDC(STREAMING_TABLE, table)}] " +
log"from DataSourceV2 named '${MDC(STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
log"${MDC(STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toContinuousStream(metadataPath)
val relation = StreamingDataSourceV2Relation(
table, output, catalog, identifier, options, metadataPath)
StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
relation
})
}

// Run V2ScanRelationPushDown here (during analysis) instead of relying on the optimizer.
// Continuous processing needs an actual V2 Scan early so we can materialize the
// ContinuousStream via scan.toContinuousStream, enumerate sources, and wire up checkpoint
// metadata paths before planning/execution. If we waited for the optimizer, a Scan might not
// yet exist at this point, which would prevent creating the stream and collecting sources
// reliably for offset tracking and recovery.
val _logicalPlan = org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
.apply(basePlan)
.transform {
case r @ StreamingDataSourceV2ScanRelation(rel, scan, out, null, None, None) =>
val stream = scan.toContinuousStream(rel.metadataPath)
r.copy(stream = stream)
}

sources = _logicalPlan.collect {
case r: StreamingDataSourceV2ScanRelation => r.stream.asInstanceOf[ContinuousStream]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class MicroBatchExecution(
var nextSourceId = 0L
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]()
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
// We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a
// map as we go to ensure each identical relation gets the same StreamingExecutionRelation
// object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical
Expand All @@ -171,7 +171,7 @@ class MicroBatchExecution(
Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders)

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
val basePlan = analyzedPlan.transform {
case streamingRelation @ StreamingRelation(dataSourceV1, sourceName, output) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
Expand All @@ -196,12 +196,9 @@ class MicroBatchExecution(
logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] " +
log"from DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
// TODO: operator pushdown.
val scan = table.newScanBuilder(options).build()
val stream = scan.toMicroBatchStream(metadataPath)
val relation = StreamingDataSourceV2Relation(
table, output, catalog, identifier, options, metadataPath)
StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
relation
})
} else if (v1.isEmpty) {
throw QueryExecutionErrors.microBatchUnsupportedByDataSourceError(
Expand All @@ -222,6 +219,23 @@ class MicroBatchExecution(
})
}
}
// Run V2ScanRelationPushDown here (during analysis) rather than relying on the optimizer.
// Micro-batch execution needs a concrete V2 Scan early so we can materialize the
// MicroBatchStream via scan.toMicroBatchStream, enumerate sources, and wire up checkpoint
// metadata paths before planning/execution and trigger initialization. If we waited for the
// optimizer, a Scan might not yet exist here, which would prevent creating the stream and
// reliably collecting sources for offset tracking and recovery. This also applies DSv2
// filter/column pushdown while building scans.
val pushedPlan = org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown
.apply(basePlan)
.transform {
case r @ StreamingDataSourceV2ScanRelation(rel, scan, out, null, None, None) =>
val stream = scan.toMicroBatchStream(rel.metadataPath)
r.copy(stream = stream)
}

val _logicalPlan = pushedPlan

sources = _logicalPlan.collect {
// v1 source
case s: StreamingExecutionRelation => s.source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ org.apache.spark.sql.streaming.sources.FakeWriteOnly
org.apache.spark.sql.streaming.sources.FakeNoWrite
org.apache.spark.sql.streaming.sources.FakeWriteSupportingExternalMetadata
org.apache.spark.sql.streaming.sources.FakeWriteSupportProviderV1Fallback

# Streaming pushdown test sources
org.apache.spark.sql.streaming.sources.PushdownStreamingSourceV1
org.apache.spark.sql.streaming.sources.PushdownStreamingSourceV2
Loading