diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 24a12248ce725..feb5683e272ec 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6093,6 +6093,11 @@ "Streaming query evolution error:" ], "subClass" : { + "CONFIG_MISMATCH" : { + "message" : [ + "Configuration mismatch detected when resuming streaming query. The session config for spark.sql.streaming.queryEvolution.enableSourceEvolution is '', but the checkpoint was created with ''. Please set the session config to match the checkpoint value to ensure consistent query behavior." + ] + }, "DUPLICATE_SOURCE_NAMES" : { "message" : [ "Duplicate streaming source names detected: . Each streaming source must have a unique name." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala index b89aaf017b1da..74feaf7164a17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala @@ -47,6 +47,32 @@ sealed trait StreamingSourceIdentifyingName { case FlowAssigned(name) => s"""name="$name"""" case Unassigned => "name=" } + + /** + * Extracts only user-provided names, filtering out flow-assigned or unassigned sources. + * Used when narrowing to explicitly user-specified names (e.g., when passing to DataSource). + * + * @return Some(UserProvided) if this is a user-provided name, None otherwise + */ + def toUserProvided: Option[UserProvided] = this match { + case up: UserProvided => Some(up) + case _ => None + } + + /** + * Extracts the name string from named sources (UserProvided or FlowAssigned). + * Returns None for Unassigned sources. + * + * Useful for pattern matching when both UserProvided and FlowAssigned should be + * treated identically (e.g., when computing metadata paths or building sourceIdMap). + * + * @return Some(name) for named sources, None for Unassigned + */ + def nameOpt: Option[String] = this match { + case UserProvided(name) => Some(name) + case FlowAssigned(name) => Some(name) + case Unassigned => None + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 2ce43cf857a93..7b2405ad01918 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1832,6 +1832,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE "operation" -> operation)) } + def streamingSourceEvolutionConfigMismatchError( + sessionValue: String, + checkpointValue: String): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH", + messageParameters = Map( + "sessionValue" -> sessionValue, + "checkpointValue" -> checkpointValue)) + } + def pythonStreamingDataSourceRuntimeError( action: String, message: String): SparkException = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala index 35ff39ce82682..2f139393ade38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnresolvedDataSource} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned} +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.SparkSession @@ -84,10 +84,11 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, - options = optionsWithPath.originalMap) + options = optionsWithPath.originalMap, + userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided) val v1Relation = ds match { case _: StreamSourceProvider => - Some(StreamingRelation(v1DataSource, sourceIdentifyingName)) + Some(StreamingRelation(v1DataSource)) case _ => None } ds match { @@ -112,16 +113,16 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { StreamingRelationV2( Some(provider), source, table, dsOptions, toAttributes(table.columns.asSchema), None, None, v1Relation, - sourceIdentifyingName) + v1DataSource.streamingSourceIdentifyingName) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. - case _ => StreamingRelation(v1DataSource, sourceIdentifyingName) + case _ => StreamingRelation(v1DataSource) } case _ => // Code path for data source v1. - StreamingRelation(v1DataSource, sourceIdentifyingName) + StreamingRelation(v1DataSource) } case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, paths) => @@ -148,7 +149,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { options = optionsWithPath.originalMap) val v1Relation = ds match { case _: StreamSourceProvider => - Some(StreamingRelation(v1DataSource, Unassigned)) + Some(StreamingRelation(v1DataSource)) case _ => None } ds match { @@ -173,7 +174,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { StreamingRelationV2( Some(provider), source, table, dsOptions, toAttributes(table.columns.asSchema), None, None, v1Relation, - Unassigned) + v1DataSource.streamingSourceIdentifyingName) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a48c71ad0d362..5acaef0392fe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName +import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned, UserProvided} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.classic.Dataset @@ -103,13 +103,16 @@ case class DataSource( bucketSpec: Option[BucketSpec] = None, options: Map[String, String] = Map.empty, catalogTable: Option[CatalogTable] = None, - userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = None) + userSpecifiedStreamingSourceName: Option[UserProvided] = None) extends SessionStateHelper with Logging { case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) private val conf: SQLConf = getSqlConf(sparkSession) + lazy val streamingSourceIdentifyingName: StreamingSourceIdentifyingName = + userSpecifiedStreamingSourceName.getOrElse(Unassigned) + lazy val providingClass: Class[_] = { val cls = DataSource.lookupDataSource(className, conf) // `providingClass` is used for resolving data source relation for catalog tables. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 220b0b8a63013..63782c753696c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -305,7 +305,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] userSpecifiedSchema = Some(table.schema), options = dsOptions, catalogTable = Some(table), - userSpecifiedStreamingSourceName = Some(sourceIdentifyingName)) + userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided) StreamingRelation(dataSource) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index 23c266e828914..5c94af3776e5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -203,7 +203,7 @@ object OffsetSeqMetadata extends Logging { STATE_STORE_ROCKSDB_FORMAT_VERSION, STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION, PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT, - STATE_STORE_ROW_CHECKSUM_ENABLED + STATE_STORE_ROW_CHECKSUM_ENABLED, ENABLE_STREAMING_SOURCE_EVOLUTION ) /** @@ -251,7 +251,8 @@ object OffsetSeqMetadata extends Logging { PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true", STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow", STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false", - STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1" + STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1", + ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false" ) def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey: ConfigEntry[T]): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index a4f50993f64ba..7f54fba1ada36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming.runtime +import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -30,7 +32,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark} -import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} +import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.{Dataset, SparkSession} @@ -48,6 +50,7 @@ import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetH import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} import org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionUtils, StateSchemaBroadcast, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.utils.StreamingUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex import org.apache.spark.sql.streaming.Trigger @@ -168,7 +171,7 @@ class MicroBatchExecution( assert(queryExecutionThread eq Thread.currentThread, "logicalPlan must be initialized in QueryExecutionThread " + s"but the current thread was ${Thread.currentThread}") - var nextSourceId = 0L + val nextSourceId = new AtomicLong(0L) val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]() val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]() @@ -183,15 +186,17 @@ class MicroBatchExecution( val disabledSources = Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders) + val enforceNamed = sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { case streamingRelation @ StreamingRelation( dataSourceV1, sourceName, output, sourceIdentifyingName) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val metadataPath = StreamingUtils.getMetadataPath( + sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) val source = dataSourceV1.createSource(metadataPath) - nextSourceId += 1 logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] " + log"from DataSourceV1 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " + log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dataSourceV1)}]") @@ -206,8 +211,8 @@ class MicroBatchExecution( if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { v2ToRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - nextSourceId += 1 + val metadataPath = StreamingUtils.getMetadataPath( + sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] " + log"from DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " + log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") @@ -224,7 +229,8 @@ class MicroBatchExecution( trigger match { case RealTimeTrigger(duration) => Some(duration) case _ => None - } + }, + sourceIdentifyingName ) StreamingDataSourceV2ScanRelation(relation, scan, output, stream) }) @@ -234,10 +240,10 @@ class MicroBatchExecution( } else { v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val metadataPath = StreamingUtils.getMetadataPath( + sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) val source = v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath) - nextSourceId += 1 logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] from " + log"DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " + log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") @@ -247,17 +253,37 @@ class MicroBatchExecution( }) } } - sources = _logicalPlan.collect { + + // Extract sources and their sourceIdentifyingName for sourceIdMap mapping + val sourcesWithNames = _logicalPlan.collect { // v1 source - case s: StreamingExecutionRelation => s.source + case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName) // v2 source - case r: StreamingDataSourceV2ScanRelation => r.stream + case r: StreamingDataSourceV2ScanRelation => (r.stream, r.relation.sourceIdentifyingName) + } + sources = sourcesWithNames.map(_._1) + + if (enforceNamed) { + // When enforcement is enabled, all sources should be named after validation in analysis. + // This assertion ensures that the validation in NameStreamingSources worked correctly. + assert(sourcesWithNames.forall(s => s._2 != Unassigned), + "All sources should be named at this point - validation should have happened in analysis") + + // Create source ID mapping using names (for OffsetMap format) + sourceIdMap = sourcesWithNames.map { case (source, sourceIdentifyingName) => + sourceIdentifyingName.nameOpt match { + case Some(name) => name -> source + case None => + throw new IllegalStateException( + "Unassigned sources should not exist when enforcement is enabled") + } + }.toMap + } else { + // When enforcement is disabled, use positional indices (backward compatibility) + sourceIdMap = sources.zipWithIndex.map { + case (source, index) => index.toString -> source + }.toMap } - - // Create source ID mapping for OffsetMap support - sourceIdMap = sources.zipWithIndex.map { - case (source, index) => index.toString -> source - }.toMap // Inform the source if it is in real time mode if (trigger.isInstanceOf[RealTimeTrigger]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala index 65c1226a85dff..037d65c87509f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.classic.{SparkSession, StreamingQuery} import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream} import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.streaming.ContinuousTrigger @@ -337,6 +338,29 @@ abstract class StreamExecution( } getLatestExecutionContext().updateStatusMessage("Initializing sources") + + // Restore relevant SQL configs from checkpoint before evaluating logicalPlan. + // This ensures configs like ENABLE_STREAMING_SOURCE_EVOLUTION are set correctly + // when the logical plan determines source naming strategy. + val sessionEnforcementBeforeRestore = + sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution + offsetLog.getLatest().foreach { case (_, offsetSeq) => + offsetSeq.metadataOpt.foreach { metadata => + OffsetSeqMetadata.setSessionConf(metadata, sparkSessionForStream.sessionState.conf) + + // Validate that critical configs haven't changed since checkpoint was created. + // This prevents config drift that could lead to incorrect query behavior. + val checkpointEnforcement = + OffsetSeqMetadata.readValueOpt( + metadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION).getOrElse("false") + if (sessionEnforcementBeforeRestore.toString != checkpointEnforcement) { + throw QueryExecutionErrors.streamingSourceEvolutionConfigMismatchError( + sessionValue = sessionEnforcementBeforeRestore.toString, + checkpointValue = checkpointEnforcement) + } + } + } + // force initialization of the logical plan so that the sources can be created logicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala index 232f3475bb1bd..efa65bc5ead49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala @@ -36,18 +36,9 @@ import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { // Extract source identifying name from DataSource for stable checkpoints - val sourceIdentifyingName = dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned) StreamingRelation( dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema), - sourceIdentifyingName) - } - - def apply( - dataSource: DataSource, - sourceIdentifyingName: StreamingSourceIdentifyingName): StreamingRelation = { - StreamingRelation( - dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema), - sourceIdentifyingName) + dataSource.streamingSourceIdentifyingName) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala index d2654ac943b2c..8dfc1fd23b57d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala @@ -16,13 +16,59 @@ */ package org.apache.spark.sql.execution.streaming.utils +import java.util.concurrent.atomic.AtomicLong + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName + object StreamingUtils { + /** + * Resolves a checkpoint location to a fully qualified path. + * + * Converts relative or unqualified paths to fully qualified paths using the + * file system's URI and working directory. + * + * @param hadoopConf Hadoop configuration to access the file system + * @param checkpointLocation The checkpoint location path (may be relative or unqualified) + * @return Fully qualified checkpoint location URI as a string + */ def resolvedCheckpointLocation(hadoopConf: Configuration, checkpointLocation: String): String = { val checkpointPath = new Path(checkpointLocation) val fs = checkpointPath.getFileSystem(hadoopConf) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } + + /** + * Computes the metadata path for a streaming source based on its identifying name. + * + * Named sources (UserProvided/FlowAssigned) use stable name-based paths, enabling + * source evolution (reordering, adding, or removing sources without breaking state). + * Unassigned sources use sequential IDs for backward compatibility. + * + * Examples: + * - UserProvided("mySource") => "$checkpointRoot/sources/mySource" + * - FlowAssigned("source_1") => "$checkpointRoot/sources/source_1" + * - Unassigned => "$checkpointRoot/sources/0" (increments nextSourceId) + * + * @param sourceIdentifyingName The source's identifying name + * (UserProvided, FlowAssigned, or Unassigned) + * @param nextSourceId AtomicLong tracking the next positional source ID for Unassigned sources + * @param resolvedCheckpointRoot The resolved checkpoint root path + * @return The computed metadata path string + */ + def getMetadataPath( + sourceIdentifyingName: StreamingSourceIdentifyingName, + nextSourceId: AtomicLong, + resolvedCheckpointRoot: String): String = { + sourceIdentifyingName.nameOpt match { + case Some(name) => + // User-provided and flow-assigned names use named paths + s"$resolvedCheckpointRoot/sources/$name" + case None => + // Unassigned sources get sequential IDs assigned here + s"$resolvedCheckpointRoot/sources/${nextSourceId.getAndIncrement()}" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala index 599a48c642085..dc8da2d5a6286 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala @@ -146,7 +146,7 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest { ), userSpecifiedSchema = Option(catalogTable.schema), catalogTable = Option(catalogTable), - userSpecifiedStreamingSourceName = Some(Unassigned) + userSpecifiedStreamingSourceName = None ), sourceName = s"FileSource[${catalogTable.location.toString}]", output = Seq(idAttr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala index caf6c14c8000c..c89895035c91a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala @@ -43,10 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends SharedSparkSession { assert(streamingRelation.get.sourceIdentifyingName == Unassigned, s"Expected Unassigned but got ${streamingRelation.get.sourceIdentifyingName}") - // Verify the DataSource has the sourceIdentifyingName set + // Verify the DataSource has no user-specified source name (None means no user-provided name) val dsSourceName = streamingRelation.get.dataSource.userSpecifiedStreamingSourceName - assert(dsSourceName == Some(Unassigned), - s"Expected Some(Unassigned) but got $dsSourceName") + assert(dsSourceName.isEmpty, + s"Expected None but got $dsSourceName") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala deleted file mode 100644 index 973e2f75439eb..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming.test - -import org.scalatest.Tag - -import org.apache.spark.sql._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.StreamTest - -/** - * Test suite for streaming source naming and validation. - * Tests cover the naming API, validation rules, and resolution pipeline. - */ -class StreamingQueryEvolutionSuite extends StreamTest { - - // ==================== - // Name Validation Tests - // ==================== - - testWithSourceEvolution("invalid source name - contains hyphen") { - checkError( - exception = intercept[AnalysisException] { - spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("my-source") - .load() - }, - condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", - parameters = Map("sourceName" -> "my-source")) - } - - testWithSourceEvolution("invalid source name - contains space") { - checkError( - exception = intercept[AnalysisException] { - spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("my source") - .load() - }, - condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", - parameters = Map("sourceName" -> "my source")) - } - - testWithSourceEvolution("invalid source name - contains dot") { - checkError( - exception = intercept[AnalysisException] { - spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("my.source") - .load() - }, - condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", - parameters = Map("sourceName" -> "my.source")) - } - - testWithSourceEvolution("invalid source name - contains special characters") { - checkError( - exception = intercept[AnalysisException] { - spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("my.source@123") - .load() - }, - condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", - parameters = Map("sourceName" -> "my.source@123")) - } - - testWithSourceEvolution("valid source names - various patterns") { - // Test that valid names work correctly - Seq("mySource", "my_source", "MySource123", "_private", "source_123_test", "123source") - .foreach { name => - val df = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name(name) - .load() - assert(df.isStreaming, s"DataFrame should be streaming for name: $name") - } - } - - testWithSourceEvolution("method chaining - name() returns reader for chaining") { - val df = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("my_source") - .option("opt1", "value1") - .load() - - assert(df.isStreaming, "DataFrame should be streaming") - } - - // ========================== - // Duplicate Detection Tests - // ========================== - - testWithSourceEvolution("duplicate source names - rejected when starting stream") { - withTempDir { checkpointDir => - val df1 = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("duplicate_name") - .load() - - val df2 = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("duplicate_name") // Same name - should fail - .load() - - checkError( - exception = intercept[AnalysisException] { - df1.union(df2).writeStream - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .start() - }, - condition = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES", - parameters = Map("names" -> "'duplicate_name'")) - } - } - - testWithSourceEvolution("enforcement enabled - unnamed source rejected") { - checkError( - exception = intercept[AnalysisException] { - spark.readStream - .format("org.apache.spark.sql.streaming.test") - .load() // Unnamed - throws error at load() time - }, - condition = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT", - parameters = Map("sourceInfo" -> ".*"), - matchPVals = true) - } - - testWithSourceEvolution("enforcement enabled - all sources named succeeds") { - val df1 = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("alpha") - .load() - - val df2 = spark.readStream - .format("org.apache.spark.sql.streaming.test") - .name("beta") - .load() - - // Should not throw - all sources are named - val union = df1.union(df2) - assert(union.isStreaming, "Union should be streaming") - } - - // ============== - // Helper Methods - // ============== - - /** - * Helper method to run tests with source evolution enabled. - */ - def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = { - test(testName, testTags: _*) { - withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { - testBody - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala new file mode 100644 index 0000000000000..d4b838ab5ab4e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala @@ -0,0 +1,581 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.test + +import scala.concurrent.duration._ + +import org.apache.hadoop.fs.Path +import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterEach, Tag} + +import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} +import org.apache.spark.sql.streaming.Trigger._ +import org.apache.spark.util.Utils + +/** + * Test suite for streaming source naming and validation. + * Tests cover the naming API, validation rules, and resolution pipeline. + */ +class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + override def afterEach(): Unit = { + spark.streams.active.foreach(_.stop()) + super.afterEach() + } + + /** + * Helper to verify that a source was created with the expected metadata path. + * @param checkpointLocation the checkpoint location path + * @param sourcePath the expected source path (e.g., "source1" or "0") + * @param mode mockito verification mode (default: times(1)) + */ + private def verifySourcePath( + checkpointLocation: Path, + sourcePath: String, + mode: org.mockito.verification.VerificationMode = times(1)): Unit = { + verify(LastOptions.mockStreamSourceProvider, mode).createSource( + any(), + meq(s"${new Path(makeQualifiedPath( + checkpointLocation.toString)).toString}/sources/$sourcePath"), + meq(None), + meq("org.apache.spark.sql.streaming.test"), + meq(Map.empty)) + } + + // ==================== + // Name Validation Tests + // ==================== + + testWithSourceEvolution("invalid source name - contains hyphen") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("my-source") + .load() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", + parameters = Map("sourceName" -> "my-source")) + } + + testWithSourceEvolution("invalid source name - contains space") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("my source") + .load() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", + parameters = Map("sourceName" -> "my source")) + } + + testWithSourceEvolution("invalid source name - contains dot") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("my.source") + .load() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", + parameters = Map("sourceName" -> "my.source")) + } + + testWithSourceEvolution("invalid source name - contains special characters") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("my.source@123") + .load() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.INVALID_SOURCE_NAME", + parameters = Map("sourceName" -> "my.source@123")) + } + + testWithSourceEvolution("valid source names - various patterns") { + // Test that valid names work correctly + Seq("mySource", "my_source", "MySource123", "_private", "source_123_test", "123source") + .foreach { name => + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name(name) + .load() + assert(df.isStreaming, s"DataFrame should be streaming for name: $name") + } + } + + testWithSourceEvolution("method chaining - name() returns reader for chaining") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("my_source") + .option("opt1", "value1") + .load() + + assert(df.isStreaming, "DataFrame should be streaming") + } + + // ========================== + // Duplicate Detection Tests + // ========================== + + testWithSourceEvolution("duplicate source names - rejected when starting stream") { + withTempDir { checkpointDir => + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("duplicate_name") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("duplicate_name") // Same name - should fail + .load() + + checkError( + exception = intercept[AnalysisException] { + df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.DUPLICATE_SOURCE_NAMES", + parameters = Map("names" -> "'duplicate_name'")) + } + } + + testWithSourceEvolution("enforcement enabled - unnamed source rejected") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() // Unnamed - throws error at load() time + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.UNNAMED_STREAMING_SOURCES_WITH_ENFORCEMENT", + parameters = Map("sourceInfo" -> ".*"), + matchPVals = true) + } + + testWithSourceEvolution("enforcement enabled - all sources named succeeds") { + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("alpha") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("beta") + .load() + + // Should not throw - all sources are named + val union = df1.union(df2) + assert(union.isStreaming, "Union should be streaming") + } + + test("without enforcement - naming sources throws error") { + withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("mySource") + .load() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.SOURCE_NAMING_NOT_SUPPORTED", + parameters = Map("name" -> "mySource")) + } + } + + // ======================= + // Metadata Path Tests + // ======================= + + testWithSourceEvolution("named sources - metadata path uses source name") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + verifySourcePath(checkpointLocation, "source1") + verifySourcePath(checkpointLocation, "source2") + } + + test("unnamed sources use positional IDs for metadata path") { + withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + // Without naming, sources get sequential IDs (Unassigned -> 0, 1, ...) + verifySourcePath(checkpointLocation, "0") + verifySourcePath(checkpointLocation, "1") + } + } + + // ======================== + // Source Evolution Tests + // ======================== + + testWithSourceEvolution("source evolution - reorder sources with named sources") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + // First query: source1 then source2 + val df1a = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2a = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q1 = df1a.union(df2a).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + LastOptions.clear() + + // Second query: source2 then source1 (reordered) - should still work + val df1b = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2b = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q2 = df2b.union(df1b).writeStream // Note: reversed order + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.processAllAvailable() + q2.stop() + + // Both sources should still use their named paths + verifySourcePath(checkpointLocation, "source1", atLeastOnce()) + verifySourcePath(checkpointLocation, "source2", atLeastOnce()) + } + + testWithSourceEvolution("source evolution - add new source with named sources") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + // First query: only source1 + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val q1 = df1.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + LastOptions.clear() + + // Second query: add source2 + val df1b = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q2 = df1b.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.processAllAvailable() + q2.stop() + + // Both sources should have been created + verifySourcePath(checkpointLocation, "source1", atLeastOnce()) + verifySourcePath(checkpointLocation, "source2") + } + + testWithSourceEvolution("named sources enforcement uses V2 offset log format") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeqLog} + val offsetLog = new OffsetSeqLog(spark, + makeQualifiedPath(checkpointLocation.toString).toString + "/offsets") + val offsetSeq = offsetLog.get(0) + assert(offsetSeq.isDefined, "Offset log should have batch 0") + assert(offsetSeq.get.isInstanceOf[OffsetMap], + s"Expected OffsetMap but got ${offsetSeq.get.getClass.getSimpleName}") + } + + testWithSourceEvolution("names preserved through union operations") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("alpha") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("beta") + .load() + + val df3 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("gamma") + .load() + + // Complex union: (alpha union beta) union gamma + val q = df1.union(df2).union(df3).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + // All three sources should use their named paths + verifySourcePath(checkpointLocation, "alpha") + verifySourcePath(checkpointLocation, "beta") + verifySourcePath(checkpointLocation, "gamma") + } + + testWithSourceEvolution("enforcement config is persisted in offset metadata") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + // Start query with enforcement enabled + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val q1 = df1.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + // Verify config was persisted in offset metadata + import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog + val offsetLog = new OffsetSeqLog(spark, + makeQualifiedPath(checkpointLocation.toString).toString + "/offsets") + val offsetSeq = offsetLog.get(0) + assert(offsetSeq.isDefined, "Offset log should have batch 0") + assert(offsetSeq.get.metadataOpt.isDefined, "Offset metadata should be present") + assert(offsetSeq.get.metadataOpt.get.conf.contains( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key), + "ENABLE_STREAMING_SOURCE_EVOLUTION should be in offset metadata") + assert(offsetSeq.get.metadataOpt.get.conf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "true", + "ENABLE_STREAMING_SOURCE_EVOLUTION should be true in offset metadata") + } + + test("config mismatch detected when restarting with different enforcement mode") { + // Start query WITHOUT enforcement (unnamed sources) + withSQLConf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false", + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + + LastOptions.clear() + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() // No .name() call + + val q1 = df1.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + // Verify enforcement=false was persisted + import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog + val offsetLog = new OffsetSeqLog(spark, + makeQualifiedPath(checkpointLocation.toString).toString + "/offsets") + val offsetSeq = offsetLog.get(0) + assert(offsetSeq.isDefined) + assert(offsetSeq.get.metadataOpt.get.conf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "false", + "ENABLE_STREAMING_SOURCE_EVOLUTION should be false in checkpoint") + + // Try to restart with enforcement ENABLED - should fail with config mismatch + withSQLConf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + + LastOptions.clear() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") // Must be named when enforcement=true + .load() + + val e = intercept[StreamingQueryException] { + val q2 = df2.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.awaitTermination() + } + + checkError( + exception = e.getCause.asInstanceOf[SparkIllegalArgumentException], + condition = "STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH", + parameters = Map("sessionValue" -> "true", "checkpointValue" -> "false")) + } + } + } + + test("upgrade from old checkpoint without enforcement config uses default value") { + import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetSeqMetadata, OffsetSeqMetadataV2} + + // Simulate old checkpoint metadata without ENABLE_STREAMING_SOURCE_EVOLUTION + val oldMetadata = OffsetSeqMetadata( + batchWatermarkMs = 0, + batchTimestampMs = 0, + conf = Map( + // Old checkpoint has other configs but not ENABLE_STREAMING_SOURCE_EVOLUTION + SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max" + ) + ) + + // Verify that reading the config returns the default value (false) + val value = OffsetSeqMetadata.readValueOpt( + oldMetadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + assert(value.contains("false"), + s"Expected default value 'false' for missing config, but got: $value") + + // Also test with V2 metadata + val oldMetadataV2 = OffsetSeqMetadataV2( + batchWatermarkMs = 0, + batchTimestampMs = 0, + conf = Map( + SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max" + ) + ) + + val valueV2 = OffsetSeqMetadata.readValueOpt( + oldMetadataV2, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + assert(valueV2.contains("false"), + s"Expected default value 'false' for missing config in V2, but got: $valueV2") + } + + // ============== + // Helper Methods + // ============== + + /** + * Helper method to run tests with source evolution enabled. + * Sets offset log format to V2 (OffsetMap) since named sources require it. + */ + def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = { + test(testName, testTags: _*) { + withSQLConf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + testBody + } + } + } +}