diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala index c1d7daa6cfcf3..ff0387fb00a2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala @@ -39,8 +39,10 @@ case class StreamingRelationV2( output: Seq[AttributeReference], catalog: Option[CatalogPlugin], identifier: Option[Identifier], - v1Relation: Option[LogicalPlan]) - extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns { + v1Relation: Option[LogicalPlan], + sourceIdentifyingName: StreamingSourceIdentifyingName = Unassigned) + extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns + with HasStreamingSourceIdentifyingName { override lazy val resolved = v1Relation.forall(_.resolved) override def isStreaming: Boolean = true override def toString: String = sourceName @@ -59,12 +61,16 @@ case class StreamingRelationV2( val newMetadata = metadataOutput.filterNot(outputSet.contains) if (newMetadata.nonEmpty) { StreamingRelationV2(source, sourceName, table, extraOptions, - output ++ newMetadata, catalog, identifier, v1Relation) + output ++ newMetadata, catalog, identifier, v1Relation, sourceIdentifyingName) } else { this } } + override def withSourceIdentifyingName(name: StreamingSourceIdentifyingName): LogicalPlan = { + copy(sourceIdentifyingName = name) + } + override def computeStats(): Statistics = Statistics( sizeInBytes = BigInt(conf.defaultSizeInBytes) ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala index ce5ac5be3b5ad..b89aaf017b1da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala @@ -17,6 +17,22 @@ package org.apache.spark.sql.catalyst.streaming +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +/** + * A trait for logical plans that have a streaming source identifying name. + * + * This trait provides a common interface for both V1 (StreamingRelation) and V2 + * (StreamingRelationV2) streaming sources, allowing analyzer rules in sql/catalyst + * to uniformly handle source naming without module boundary issues. + * + * The self-type constraint ensures this trait can only be mixed into LogicalPlan subclasses. + */ +trait HasStreamingSourceIdentifyingName { self: LogicalPlan => + def sourceIdentifyingName: StreamingSourceIdentifyingName + def withSourceIdentifyingName(name: StreamingSourceIdentifyingName): LogicalPlan +} + /** * Represents the identifying name state for a streaming source during query analysis. * diff --git a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain index 2cc166efa99ec..636b53788d39f 100644 --- a/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain +++ b/sql/connect/common/src/test/resources/query-tests/explain-results/streaming_table_API_with_options.explain @@ -1,2 +1,2 @@ ~SubqueryAlias primary.tempdb.myStreamingTable -+- ~StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable ++- ~StreamingRelationV2 primary.tempdb.myStreamingTable, org.apache.spark.sql.connector.catalog.InMemoryTable, [p1=v1, p2=v2], [id#0L], org.apache.spark.sql.connector.catalog.InMemoryCatalog, tempdb.myStreamingTable, name= diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 539bcb6c29e89..48b91a064e3ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -331,7 +331,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] result case s @ StreamingRelationV2( - _, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) => + _, _, table, extraOptions, _, _, _, + Some(UnresolvedCatalogRelation(tableMeta, _, true)), _) => import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val v1Relation = getStreamingRelation(tableMeta, extraOptions) if (table.isInstanceOf[SupportsRead] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index ad7e9f3e4aa96..14cd06038b5af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -79,7 +79,7 @@ class ContinuousExecution( import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { case s @ StreamingRelationV2(ds, sourceName, table: SupportsRead, options, output, - catalog, identifier, _) => + catalog, identifier, _, _) => val dsStr = if (ds.nonEmpty) s"[${ds.get}]" else "" if (!table.supports(TableCapability.CONTINUOUS_READ)) { throw QueryExecutionErrors.continuousProcessingUnsupportedByDataSourceError(sourceName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index b0c6159517a96..d81b2276eab73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -198,7 +198,7 @@ class MicroBatchExecution( }) case s @ StreamingRelationV2(src, srcName, table: SupportsRead, options, output, - catalog, identifier, v1) => + catalog, identifier, v1, _) => val dsStr = if (src.nonEmpty) s"[${src.get}]" else "" val v2Disabled = disabledSources.contains(src.getOrElse(None).getClass.getCanonicalName) if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {