Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ case class StreamingRelationV2(
output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
v1Relation: Option[LogicalPlan])
extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
v1Relation: Option[LogicalPlan],
sourceIdentifyingName: StreamingSourceIdentifyingName = Unassigned)
extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns
with HasStreamingSourceIdentifyingName {
override lazy val resolved = v1Relation.forall(_.resolved)
override def isStreaming: Boolean = true
override def toString: String = sourceName
Expand All @@ -59,12 +61,16 @@ case class StreamingRelationV2(
val newMetadata = metadataOutput.filterNot(outputSet.contains)
if (newMetadata.nonEmpty) {
StreamingRelationV2(source, sourceName, table, extraOptions,
output ++ newMetadata, catalog, identifier, v1Relation)
output ++ newMetadata, catalog, identifier, v1Relation, sourceIdentifyingName)
} else {
this
}
}

override def withSourceIdentifyingName(name: StreamingSourceIdentifyingName): LogicalPlan = {
copy(sourceIdentifyingName = name)
}

override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(conf.defaultSizeInBytes)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@

package org.apache.spark.sql.catalyst.streaming

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* A trait for logical plans that have a streaming source identifying name.
*
* This trait provides a common interface for both V1 (StreamingRelation) and V2
* (StreamingRelationV2) streaming sources, allowing analyzer rules in sql/catalyst
* to uniformly handle source naming without module boundary issues.
*
* The self-type constraint ensures this trait can only be mixed into LogicalPlan subclasses.
*/
trait HasStreamingSourceIdentifyingName { self: LogicalPlan =>
def sourceIdentifyingName: StreamingSourceIdentifyingName
def withSourceIdentifyingName(name: StreamingSourceIdentifyingName): LogicalPlan
}

/**
* Represents the identifying name state for a streaming source during query analysis.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
~SubqueryAlias primary.tempdb.myStreamingTable
+- ~StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable
+- ~StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable, name=<Unassigned>
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
result

case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
_, _, table, extraOptions, _, _, _,
Some(UnresolvedCatalogRelation(tableMeta, _, true)), _) =>
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
if (table.isInstanceOf[SupportsRead]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class ContinuousExecution(
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output,
catalog, identifier, _) =>
catalog, identifier, _, _) =>
val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else ""
if (!table.supports(TableCapability.CONTINUOUS_READ)) {
throw QueryExecutionErrors.continuousProcessingUnsupportedByDataSourceError(sourceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class MicroBatchExecution(
})

case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output,
catalog, identifier, v1) =>
catalog, identifier, v1, _) =>
val dsStr = if (src.nonEmpty) s"[${src.get}]" else ""
val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName)
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
Expand Down