From 061e56fb6a9446a179f99ccc34ac1f411585ec06 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 18 Feb 2026 16:27:34 -0800 Subject: [PATCH 1/8] [SPARK][SQL] Centralize streaming source identifying name in DataSource Move streamingSourceIdentifyingName from being passed explicitly through StreamingRelation constructors to being derived from DataSource itself via a lazy val, removing the overloaded StreamingRelation.apply and eliminating scattered Unassigned defaults at call sites. --- .../catalyst/analysis/ResolveDataSource.scala | 17 +++++++++-------- .../sql/execution/datasources/DataSource.scala | 5 ++++- .../streaming/runtime/StreamingRelation.scala | 11 +---------- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala index 35ff39ce82682..a1e4f2ac96b7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnresolvedDataSource} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned} +import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.classic.SparkSession @@ -84,10 +84,11 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, - options = optionsWithPath.originalMap) + options = optionsWithPath.originalMap, + userSpecifiedStreamingSourceName = Some(sourceIdentifyingName)) val v1Relation = ds match { case _: StreamSourceProvider => - Some(StreamingRelation(v1DataSource, sourceIdentifyingName)) + Some(StreamingRelation(v1DataSource)) case _ => None } ds match { @@ -112,16 +113,16 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { StreamingRelationV2( Some(provider), source, table, dsOptions, toAttributes(table.columns.asSchema), None, None, v1Relation, - sourceIdentifyingName) + v1DataSource.streamingSourceIdentifyingName) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. - case _ => StreamingRelation(v1DataSource, sourceIdentifyingName) + case _ => StreamingRelation(v1DataSource) } case _ => // Code path for data source v1. - StreamingRelation(v1DataSource, sourceIdentifyingName) + StreamingRelation(v1DataSource) } case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, paths) => @@ -148,7 +149,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { options = optionsWithPath.originalMap) val v1Relation = ds match { case _: StreamSourceProvider => - Some(StreamingRelation(v1DataSource, Unassigned)) + Some(StreamingRelation(v1DataSource)) case _ => None } ds match { @@ -173,7 +174,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { StreamingRelationV2( Some(provider), source, table, dsOptions, toAttributes(table.columns.asSchema), None, None, v1Relation, - Unassigned) + v1DataSource.streamingSourceIdentifyingName) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a48c71ad0d362..0a56f219f98b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName +import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.classic.Dataset @@ -110,6 +110,9 @@ case class DataSource( private val conf: SQLConf = getSqlConf(sparkSession) + lazy val streamingSourceIdentifyingName: StreamingSourceIdentifyingName = + userSpecifiedStreamingSourceName.getOrElse(Unassigned) + lazy val providingClass: Class[_] = { val cls = DataSource.lookupDataSource(className, conf) // `providingClass` is used for resolving data source relation for catalog tables. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala index 232f3475bb1bd..efa65bc5ead49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamingRelation.scala @@ -36,18 +36,9 @@ import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { // Extract source identifying name from DataSource for stable checkpoints - val sourceIdentifyingName = dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned) StreamingRelation( dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema), - sourceIdentifyingName) - } - - def apply( - dataSource: DataSource, - sourceIdentifyingName: StreamingSourceIdentifyingName): StreamingRelation = { - StreamingRelation( - dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema), - sourceIdentifyingName) + dataSource.streamingSourceIdentifyingName) } } From 9ce2f1b37c9ab3c594b6408f167af2a385fcbf42 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 18 Feb 2026 17:20:14 -0800 Subject: [PATCH 2/8] [SPARK][SQL] Hook streaming source identifying name into metadata paths and sourceIdMap - Centralize streamingSourceIdentifyingName in DataSource as a lazy val (UserProvided/FlowAssigned -> named, Unassigned -> positional fallback) - Remove overloaded StreamingRelation.apply; single apply delegates to DataSource.streamingSourceIdentifyingName - Pass userSpecifiedStreamingSourceName through DataSource in ResolveDataSource so all StreamingRelation/V2 call sites use the single-arg apply - In MicroBatchExecution, use MicroBatchExecution.getMetadataPath to derive source metadata paths from sourceIdentifyingName rather than hard-coded positional IDs - When ENABLE_STREAMING_SOURCE_EVOLUTION is on, build sourceIdMap with user-provided names; otherwise fall back to positional indices - Pass sourceIdentifyingName through to StreamingDataSourceV2Relation --- .../runtime/MicroBatchExecution.scala | 84 +++++++++++++++---- 1 file changed, 67 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index a4f50993f64ba..6f0cc53e54a12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.streaming.runtime +import java.util.concurrent.atomic.AtomicLong + import scala.collection.mutable.{Map => MutableMap} import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -30,7 +32,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark} -import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} +import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, StreamingRelationV2, StreamingSourceIdentifyingName, Unassigned, UserProvided, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.{Dataset, SparkSession} @@ -168,7 +170,7 @@ class MicroBatchExecution( assert(queryExecutionThread eq Thread.currentThread, "logicalPlan must be initialized in QueryExecutionThread " + s"but the current thread was ${Thread.currentThread}") - var nextSourceId = 0L + val nextSourceId = new AtomicLong(0L) val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]() val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]() val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]() @@ -183,15 +185,17 @@ class MicroBatchExecution( val disabledSources = Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders) + val enforceNamed = sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution + import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ val _logicalPlan = analyzedPlan.transform { case streamingRelation @ StreamingRelation( dataSourceV1, sourceName, output, sourceIdentifyingName) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val metadataPath = MicroBatchExecution.getMetadataPath( + sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) val source = dataSourceV1.createSource(metadataPath) - nextSourceId += 1 logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] " + log"from DataSourceV1 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " + log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dataSourceV1)}]") @@ -206,8 +210,8 @@ class MicroBatchExecution( if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { v2ToRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - nextSourceId += 1 + val metadataPath = MicroBatchExecution.getMetadataPath( + sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] " + log"from DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " + log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") @@ -224,7 +228,8 @@ class MicroBatchExecution( trigger match { case RealTimeTrigger(duration) => Some(duration) case _ => None - } + }, + sourceIdentifyingName ) StreamingDataSourceV2ScanRelation(relation, scan, output, stream) }) @@ -234,10 +239,10 @@ class MicroBatchExecution( } else { v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch - val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" + val metadataPath = MicroBatchExecution.getMetadataPath( + sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) val source = v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath) - nextSourceId += 1 logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] from " + log"DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " + log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}") @@ -247,17 +252,36 @@ class MicroBatchExecution( }) } } - sources = _logicalPlan.collect { + + // Extract sources and their sourceIdentifyingName for sourceIdMap mapping + val sourcesWithNames = _logicalPlan.collect { // v1 source - case s: StreamingExecutionRelation => s.source + case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName) // v2 source - case r: StreamingDataSourceV2ScanRelation => r.stream + case r: StreamingDataSourceV2ScanRelation => (r.stream, r.relation.sourceIdentifyingName) + } + sources = sourcesWithNames.map(_._1) + + if (enforceNamed) { + // When enforcement is enabled, all sources should be named after validation in analysis. + // This assertion ensures that the validation in NameStreamingSources worked correctly. + assert(sourcesWithNames.forall(s => s._2 != Unassigned), + "All sources should be named at this point - validation should have happened in analysis") + + // Create source ID mapping using names (for OffsetMap format) + sourceIdMap = sourcesWithNames.map { + case (source, UserProvided(name)) => name -> source + case (source, FlowAssigned(name)) => name -> source + case (source, Unassigned) => + throw new IllegalStateException( + "Unassigned sources should not exist when enforcement is enabled") + }.toMap + } else { + // When enforcement is disabled, use positional indices (backward compatibility) + sourceIdMap = sources.zipWithIndex.map { + case (source, index) => index.toString -> source + }.toMap } - - // Create source ID mapping for OffsetMap support - sourceIdMap = sources.zipWithIndex.map { - case (source, index) => index.toString -> source - }.toMap // Inform the source if it is in real time mode if (trigger.isInstanceOf[RealTimeTrigger]) { @@ -1414,6 +1438,32 @@ class MicroBatchExecution( object MicroBatchExecution { val BATCH_ID_KEY = "streaming.sql.batchId" + + /** + * Computes the metadata path for a streaming source based on its identifying name. + * + * @param sourceIdentifyingName The source's identifying name + * (UserProvided, FlowAssigned, or Unassigned) + * @param nextSourceId AtomicLong tracking the next positional source ID + * @param resolvedCheckpointRoot The resolved checkpoint root path + * @return The computed metadata path string + */ + def getMetadataPath( + sourceIdentifyingName: StreamingSourceIdentifyingName, + nextSourceId: AtomicLong, + resolvedCheckpointRoot: String): String = { + sourceIdentifyingName match { + case UserProvided(name) => + // User-provided names always use named paths + s"$resolvedCheckpointRoot/sources/$name" + case FlowAssigned(name) => + // Flow-assigned names use named paths + s"$resolvedCheckpointRoot/sources/$name" + case Unassigned => + // Unassigned sources get sequential IDs assigned here + s"$resolvedCheckpointRoot/sources/${nextSourceId.getAndIncrement()}" + } + } } case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends LeafNode { From ee09cfbc0b591848a7f215f6c958058c64dc594e Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 18 Feb 2026 17:26:38 -0800 Subject: [PATCH 3/8] [SPARK][SQL][TESTS] Add integration tests for streaming source naming and metadata paths Add tests covering: - Named sources produce named metadata paths (sources/) - Unnamed sources fall back to positional IDs (sources/0, sources/1, ...) - Source evolution: reordering and adding sources with named sources - Named source enforcement requires V2 (OffsetMap) offset log format - Names are preserved correctly through multi-level union operations - Naming sources without enforcement enabled raises an error Also adds BeforeAndAfterEach, verifySourcePath helper (mockito), and updates testWithSourceEvolution to set STREAMING_OFFSET_LOG_FORMAT_VERSION=2. --- .../test/StreamingQueryEvolutionSuite.scala | 283 +++++++++++++++++- 1 file changed, 280 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala index 973e2f75439eb..a809d359b991d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala @@ -17,17 +17,52 @@ package org.apache.spark.sql.streaming.test -import org.scalatest.Tag +import scala.concurrent.duration._ + +import org.apache.hadoop.fs.Path +import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterEach, Tag} import org.apache.spark.sql._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.Trigger._ +import org.apache.spark.util.Utils /** * Test suite for streaming source naming and validation. * Tests cover the naming API, validation rules, and resolution pipeline. */ -class StreamingQueryEvolutionSuite extends StreamTest { +class StreamingQueryEvolutionSuite extends StreamTest with BeforeAndAfterEach { + import testImplicits._ + + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + override def afterEach(): Unit = { + spark.streams.active.foreach(_.stop()) + super.afterEach() + } + + /** + * Helper to verify that a source was created with the expected metadata path. + * @param checkpointLocation the checkpoint location path + * @param sourcePath the expected source path (e.g., "source1" or "0") + * @param mode mockito verification mode (default: times(1)) + */ + private def verifySourcePath( + checkpointLocation: Path, + sourcePath: String, + mode: org.mockito.verification.VerificationMode = times(1)): Unit = { + verify(LastOptions.mockStreamSourceProvider, mode).createSource( + any(), + meq(s"${new Path(makeQualifiedPath( + checkpointLocation.toString)).toString}/sources/$sourcePath"), + meq(None), + meq("org.apache.spark.sql.streaming.test"), + meq(Map.empty)) + } // ==================== // Name Validation Tests @@ -159,16 +194,258 @@ class StreamingQueryEvolutionSuite extends StreamTest { assert(union.isStreaming, "Union should be streaming") } + test("without enforcement - naming sources throws error") { + withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + checkError( + exception = intercept[AnalysisException] { + spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("mySource") + .load() + }, + condition = "STREAMING_QUERY_EVOLUTION_ERROR.SOURCE_NAMING_NOT_SUPPORTED", + parameters = Map("name" -> "mySource")) + } + } + + // ======================= + // Metadata Path Tests + // ======================= + + testWithSourceEvolution("named sources - metadata path uses source name") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + verifySourcePath(checkpointLocation, "source1") + verifySourcePath(checkpointLocation, "source2") + } + + test("unnamed sources use positional IDs for metadata path") { + withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + // Without naming, sources get sequential IDs (Unassigned -> 0, 1, ...) + verifySourcePath(checkpointLocation, "0") + verifySourcePath(checkpointLocation, "1") + } + } + + // ======================== + // Source Evolution Tests + // ======================== + + testWithSourceEvolution("source evolution - reorder sources with named sources") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + // First query: source1 then source2 + val df1a = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2a = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q1 = df1a.union(df2a).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + LastOptions.clear() + + // Second query: source2 then source1 (reordered) - should still work + val df1b = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2b = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q2 = df2b.union(df1b).writeStream // Note: reversed order + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.processAllAvailable() + q2.stop() + + // Both sources should still use their named paths + verifySourcePath(checkpointLocation, "source1", atLeastOnce()) + verifySourcePath(checkpointLocation, "source2", atLeastOnce()) + } + + testWithSourceEvolution("source evolution - add new source with named sources") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + // First query: only source1 + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val q1 = df1.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + LastOptions.clear() + + // Second query: add source2 + val df1b = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q2 = df1b.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.processAllAvailable() + q2.stop() + + // Both sources should have been created + verifySourcePath(checkpointLocation, "source1", atLeastOnce()) + verifySourcePath(checkpointLocation, "source2") + } + + testWithSourceEvolution("named sources enforcement uses V2 offset log format") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source2") + .load() + + val q = df1.union(df2).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetMap, OffsetSeqLog} + val offsetLog = new OffsetSeqLog(spark, + makeQualifiedPath(checkpointLocation.toString).toString + "/offsets") + val offsetSeq = offsetLog.get(0) + assert(offsetSeq.isDefined, "Offset log should have batch 0") + assert(offsetSeq.get.isInstanceOf[OffsetMap], + s"Expected OffsetMap but got ${offsetSeq.get.getClass.getSimpleName}") + } + + testWithSourceEvolution("names preserved through union operations") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("alpha") + .load() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("beta") + .load() + + val df3 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("gamma") + .load() + + // Complex union: (alpha union beta) union gamma + val q = df1.union(df2).union(df3).writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q.processAllAvailable() + q.stop() + + // All three sources should use their named paths + verifySourcePath(checkpointLocation, "alpha") + verifySourcePath(checkpointLocation, "beta") + verifySourcePath(checkpointLocation, "gamma") + } + // ============== // Helper Methods // ============== /** * Helper method to run tests with source evolution enabled. + * Sets offset log format to V2 (OffsetMap) since named sources require it. */ def testWithSourceEvolution(testName: String, testTags: Tag*)(testBody: => Any): Unit = { test(testName, testTags: _*) { - withSQLConf(SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true") { + withSQLConf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { testBody } } From 8d3d87cd8dd9c9137e3f200f09b1c2590efd5f7d Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 18 Feb 2026 17:29:55 -0800 Subject: [PATCH 4/8] Remove unused testImplicits import --- .../spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala index a809d359b991d..8ec7a6b0bac1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala @@ -35,7 +35,6 @@ import org.apache.spark.util.Utils * Tests cover the naming API, validation rules, and resolution pipeline. */ class StreamingQueryEvolutionSuite extends StreamTest with BeforeAndAfterEach { - import testImplicits._ private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath From f04c0e3128ee9f36c3eec68dd63b8b9ac5493a46 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Thu, 19 Feb 2026 13:56:44 -0800 Subject: [PATCH 5/8] [SPARK][SQL] Narrow userSpecifiedStreamingSourceName to Option[UserProvided] Change DataSource.userSpecifiedStreamingSourceName from Option[StreamingSourceIdentifyingName] to Option[UserProvided], preventing Unassigned/FlowAssigned from being passed and ensuring DLT flow context fallback always fires when no user-provided name exists. - Add toUserProvided helper to StreamingSourceIdentifyingName - Use sourceIdentifyingName.toUserProvided in ResolveDataSource and DataSourceStrategy - Update tests to pass None instead of Some(Unassigned) This fixes StreamingTableExternalLocationSuite failures where passing Some(Unassigned) short-circuited the DLT flow context fallback that assigns FlowAssigned names from checkpoints. --- .../catalyst/streaming/StreamingSourceIdentifyingName.scala | 5 +++++ .../spark/sql/catalyst/analysis/ResolveDataSource.scala | 2 +- .../apache/spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../sql/execution/datasources/DataSourceStrategy.scala | 2 +- .../spark/sql/execution/streaming/StreamRelationSuite.scala | 2 +- .../sql/streaming/StreamingSourceIdentifyingNameSuite.scala | 6 +++--- 6 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala index b89aaf017b1da..3053bfac91f88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala @@ -47,6 +47,11 @@ sealed trait StreamingSourceIdentifyingName { case FlowAssigned(name) => s"""name="$name"""" case Unassigned => "name=" } + + def toUserProvided: Option[UserProvided] = this match { + case up: UserProvided => Some(up) + case _ => None + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala index a1e4f2ac96b7e..2f139393ade38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDataSource.scala @@ -85,7 +85,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { userSpecifiedSchema = userSpecifiedSchema, className = source, options = optionsWithPath.originalMap, - userSpecifiedStreamingSourceName = Some(sourceIdentifyingName)) + userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided) val v1Relation = ds match { case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 0a56f219f98b5..5acaef0392fe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned} +import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned, UserProvided} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils} import org.apache.spark.sql.classic.ClassicConversions.castToImpl import org.apache.spark.sql.classic.Dataset @@ -103,7 +103,7 @@ case class DataSource( bucketSpec: Option[BucketSpec] = None, options: Map[String, String] = Map.empty, catalogTable: Option[CatalogTable] = None, - userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = None) + userSpecifiedStreamingSourceName: Option[UserProvided] = None) extends SessionStateHelper with Logging { case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String]) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 220b0b8a63013..63782c753696c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -305,7 +305,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] userSpecifiedSchema = Some(table.schema), options = dsOptions, catalogTable = Some(table), - userSpecifiedStreamingSourceName = Some(sourceIdentifyingName)) + userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided) StreamingRelation(dataSource) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala index 599a48c642085..dc8da2d5a6286 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala @@ -146,7 +146,7 @@ class StreamRelationSuite extends SharedSparkSession with AnalysisTest { ), userSpecifiedSchema = Option(catalogTable.schema), catalogTable = Option(catalogTable), - userSpecifiedStreamingSourceName = Some(Unassigned) + userSpecifiedStreamingSourceName = None ), sourceName = s"FileSource[${catalogTable.location.toString}]", output = Seq(idAttr) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala index caf6c14c8000c..c89895035c91a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSourceIdentifyingNameSuite.scala @@ -43,10 +43,10 @@ class StreamingSourceIdentifyingNameSuite extends SharedSparkSession { assert(streamingRelation.get.sourceIdentifyingName == Unassigned, s"Expected Unassigned but got ${streamingRelation.get.sourceIdentifyingName}") - // Verify the DataSource has the sourceIdentifyingName set + // Verify the DataSource has no user-specified source name (None means no user-provided name) val dsSourceName = streamingRelation.get.dataSource.userSpecifiedStreamingSourceName - assert(dsSourceName == Some(Unassigned), - s"Expected Some(Unassigned) but got $dsSourceName") + assert(dsSourceName.isEmpty, + s"Expected None but got $dsSourceName") } } From 1fe92b9ab54348ba9882d6b422180cf044a03f54 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Thu, 19 Feb 2026 16:42:30 -0800 Subject: [PATCH 6/8] [SPARK][SQL] Persist enforcement mode in checkpoint and refactor metadata path logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit addresses several improvements to streaming source evolution: 1. Persist ENABLE_STREAMING_SOURCE_EVOLUTION config in offset metadata - Added to relevantSQLConfs so it's automatically persisted - Ensures enforcement mode stays consistent across query restarts 2. Restore configs from checkpoint before logicalPlan evaluation - Moved config restoration to StreamExecution before logicalPlan is forced - Prevents reading stale session config when resuming queries 3. Refactor metadata path computation - Moved getMetadataPath from MicroBatchExecution to StreamingUtils - Added nameOpt helper to StreamingSourceIdentifyingName - Simplified pattern matching using the new helper 4. Rename test suite for clarity - StreamingQueryEvolutionSuite → StreamingSourceEvolutionSuite --- .../StreamingSourceIdentifyingName.scala | 6 +++ .../streaming/checkpointing/OffsetSeq.scala | 2 +- .../runtime/MicroBatchExecution.scala | 48 +++++-------------- .../streaming/runtime/StreamExecution.scala | 10 ++++ .../streaming/utils/StreamingUtils.scala | 27 +++++++++++ ...la => StreamingSourceEvolutionSuite.scala} | 2 +- 6 files changed, 57 insertions(+), 38 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/streaming/test/{StreamingQueryEvolutionSuite.scala => StreamingSourceEvolutionSuite.scala} (99%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala index 3053bfac91f88..1c50434d1a678 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala @@ -52,6 +52,12 @@ sealed trait StreamingSourceIdentifyingName { case up: UserProvided => Some(up) case _ => None } + + def nameOpt: Option[String] = this match { + case UserProvided(name) => Some(name) + case FlowAssigned(name) => Some(name) + case Unassigned => None + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index 23c266e828914..ea7680c74f1a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -203,7 +203,7 @@ object OffsetSeqMetadata extends Logging { STATE_STORE_ROCKSDB_FORMAT_VERSION, STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION, STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION, PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT, - STATE_STORE_ROW_CHECKSUM_ENABLED + STATE_STORE_ROW_CHECKSUM_ENABLED, ENABLE_STREAMING_SOURCE_EVOLUTION ) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 6f0cc53e54a12..7f54fba1ada36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.LogKeys._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark} -import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, StreamingRelationV2, StreamingSourceIdentifyingName, Unassigned, UserProvided, WriteToStream} +import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.classic.{Dataset, SparkSession} @@ -50,6 +50,7 @@ import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetH import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} import org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionUtils, StateSchemaBroadcast, StateStoreErrors} +import org.apache.spark.sql.execution.streaming.utils.StreamingUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex import org.apache.spark.sql.streaming.Trigger @@ -193,7 +194,7 @@ class MicroBatchExecution( dataSourceV1, sourceName, output, sourceIdentifyingName) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch - val metadataPath = MicroBatchExecution.getMetadataPath( + val metadataPath = StreamingUtils.getMetadataPath( sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) val source = dataSourceV1.createSource(metadataPath) logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] " + @@ -210,7 +211,7 @@ class MicroBatchExecution( if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) { v2ToRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch - val metadataPath = MicroBatchExecution.getMetadataPath( + val metadataPath = StreamingUtils.getMetadataPath( sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] " + log"from DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " + @@ -239,7 +240,7 @@ class MicroBatchExecution( } else { v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch - val metadataPath = MicroBatchExecution.getMetadataPath( + val metadataPath = StreamingUtils.getMetadataPath( sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot) val source = v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath) @@ -269,12 +270,13 @@ class MicroBatchExecution( "All sources should be named at this point - validation should have happened in analysis") // Create source ID mapping using names (for OffsetMap format) - sourceIdMap = sourcesWithNames.map { - case (source, UserProvided(name)) => name -> source - case (source, FlowAssigned(name)) => name -> source - case (source, Unassigned) => - throw new IllegalStateException( - "Unassigned sources should not exist when enforcement is enabled") + sourceIdMap = sourcesWithNames.map { case (source, sourceIdentifyingName) => + sourceIdentifyingName.nameOpt match { + case Some(name) => name -> source + case None => + throw new IllegalStateException( + "Unassigned sources should not exist when enforcement is enabled") + } }.toMap } else { // When enforcement is disabled, use positional indices (backward compatibility) @@ -1438,32 +1440,6 @@ class MicroBatchExecution( object MicroBatchExecution { val BATCH_ID_KEY = "streaming.sql.batchId" - - /** - * Computes the metadata path for a streaming source based on its identifying name. - * - * @param sourceIdentifyingName The source's identifying name - * (UserProvided, FlowAssigned, or Unassigned) - * @param nextSourceId AtomicLong tracking the next positional source ID - * @param resolvedCheckpointRoot The resolved checkpoint root path - * @return The computed metadata path string - */ - def getMetadataPath( - sourceIdentifyingName: StreamingSourceIdentifyingName, - nextSourceId: AtomicLong, - resolvedCheckpointRoot: String): String = { - sourceIdentifyingName match { - case UserProvided(name) => - // User-provided names always use named paths - s"$resolvedCheckpointRoot/sources/$name" - case FlowAssigned(name) => - // Flow-assigned names use named paths - s"$resolvedCheckpointRoot/sources/$name" - case Unassigned => - // Unassigned sources get sequential IDs assigned here - s"$resolvedCheckpointRoot/sources/${nextSourceId.getAndIncrement()}" - } - } } case class OffsetHolder(start: OffsetV2, end: Option[OffsetV2]) extends LeafNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala index 65c1226a85dff..f1604223e256a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala @@ -337,6 +337,16 @@ abstract class StreamExecution( } getLatestExecutionContext().updateStatusMessage("Initializing sources") + + // Restore relevant SQL configs from checkpoint before evaluating logicalPlan. + // This ensures configs like ENABLE_STREAMING_SOURCE_EVOLUTION are set correctly + // when the logical plan determines source naming strategy. + offsetLog.getLatest().foreach { case (_, offsetSeq) => + offsetSeq.metadataOpt.foreach { metadata => + OffsetSeqMetadata.setSessionConf(metadata, sparkSessionForStream.sessionState.conf) + } + } + // force initialization of the logical plan so that the sources can be created logicalPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala index d2654ac943b2c..7dd83f5cd7ea0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala @@ -16,13 +16,40 @@ */ package org.apache.spark.sql.execution.streaming.utils +import java.util.concurrent.atomic.AtomicLong + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName + object StreamingUtils { def resolvedCheckpointLocation(hadoopConf: Configuration, checkpointLocation: String): String = { val checkpointPath = new Path(checkpointLocation) val fs = checkpointPath.getFileSystem(hadoopConf) checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString } + + /** + * Computes the metadata path for a streaming source based on its identifying name. + * + * @param sourceIdentifyingName The source's identifying name + * (UserProvided, FlowAssigned, or Unassigned) + * @param nextSourceId AtomicLong tracking the next positional source ID + * @param resolvedCheckpointRoot The resolved checkpoint root path + * @return The computed metadata path string + */ + def getMetadataPath( + sourceIdentifyingName: StreamingSourceIdentifyingName, + nextSourceId: AtomicLong, + resolvedCheckpointRoot: String): String = { + sourceIdentifyingName.nameOpt match { + case Some(name) => + // User-provided and flow-assigned names use named paths + s"$resolvedCheckpointRoot/sources/$name" + case None => + // Unassigned sources get sequential IDs assigned here + s"$resolvedCheckpointRoot/sources/${nextSourceId.getAndIncrement()}" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala similarity index 99% rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala index 8ec7a6b0bac1e..54657f9f36b07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.Utils * Test suite for streaming source naming and validation. * Tests cover the naming API, validation rules, and resolution pipeline. */ -class StreamingQueryEvolutionSuite extends StreamTest with BeforeAndAfterEach { +class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath From 72213a6a0c1cf63ac4273c1d368acf2e5702334e Mon Sep 17 00:00:00 2001 From: ericm-db Date: Thu, 19 Feb 2026 16:52:10 -0800 Subject: [PATCH 7/8] renaming suite, adding conf --- .../StreamingSourceIdentifyingName.scala | 15 +++ .../streaming/checkpointing/OffsetSeq.scala | 3 +- .../streaming/utils/StreamingUtils.scala | 21 ++- .../test/StreamingSourceEvolutionSuite.scala | 125 ++++++++++++++++++ 4 files changed, 162 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala index 1c50434d1a678..74feaf7164a17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala @@ -48,11 +48,26 @@ sealed trait StreamingSourceIdentifyingName { case Unassigned => "name=" } + /** + * Extracts only user-provided names, filtering out flow-assigned or unassigned sources. + * Used when narrowing to explicitly user-specified names (e.g., when passing to DataSource). + * + * @return Some(UserProvided) if this is a user-provided name, None otherwise + */ def toUserProvided: Option[UserProvided] = this match { case up: UserProvided => Some(up) case _ => None } + /** + * Extracts the name string from named sources (UserProvided or FlowAssigned). + * Returns None for Unassigned sources. + * + * Useful for pattern matching when both UserProvided and FlowAssigned should be + * treated identically (e.g., when computing metadata paths or building sourceIdMap). + * + * @return Some(name) for named sources, None for Unassigned + */ def nameOpt: Option[String] = this match { case UserProvided(name) => Some(name) case FlowAssigned(name) => Some(name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala index ea7680c74f1a1..5c94af3776e5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala @@ -251,7 +251,8 @@ object OffsetSeqMetadata extends Logging { PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true", STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow", STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false", - STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1" + STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1", + ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false" ) def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey: ConfigEntry[T]): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala index 7dd83f5cd7ea0..8dfc1fd23b57d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/utils/StreamingUtils.scala @@ -24,6 +24,16 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName object StreamingUtils { + /** + * Resolves a checkpoint location to a fully qualified path. + * + * Converts relative or unqualified paths to fully qualified paths using the + * file system's URI and working directory. + * + * @param hadoopConf Hadoop configuration to access the file system + * @param checkpointLocation The checkpoint location path (may be relative or unqualified) + * @return Fully qualified checkpoint location URI as a string + */ def resolvedCheckpointLocation(hadoopConf: Configuration, checkpointLocation: String): String = { val checkpointPath = new Path(checkpointLocation) val fs = checkpointPath.getFileSystem(hadoopConf) @@ -33,9 +43,18 @@ object StreamingUtils { /** * Computes the metadata path for a streaming source based on its identifying name. * + * Named sources (UserProvided/FlowAssigned) use stable name-based paths, enabling + * source evolution (reordering, adding, or removing sources without breaking state). + * Unassigned sources use sequential IDs for backward compatibility. + * + * Examples: + * - UserProvided("mySource") => "$checkpointRoot/sources/mySource" + * - FlowAssigned("source_1") => "$checkpointRoot/sources/source_1" + * - Unassigned => "$checkpointRoot/sources/0" (increments nextSourceId) + * * @param sourceIdentifyingName The source's identifying name * (UserProvided, FlowAssigned, or Unassigned) - * @param nextSourceId AtomicLong tracking the next positional source ID + * @param nextSourceId AtomicLong tracking the next positional source ID for Unassigned sources * @param resolvedCheckpointRoot The resolved checkpoint root path * @return The computed metadata path string */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala index 54657f9f36b07..745666f89e4f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala @@ -432,6 +432,131 @@ class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { verifySourcePath(checkpointLocation, "gamma") } + testWithSourceEvolution("enforcement config is persisted in offset metadata") { + LastOptions.clear() + + val checkpointLocation = new Path(newMetadataDir) + + // Start query with enforcement enabled + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .name("source1") + .load() + + val q1 = df1.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + // Verify config was persisted in offset metadata + import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog + val offsetLog = new OffsetSeqLog(spark, + makeQualifiedPath(checkpointLocation.toString).toString + "/offsets") + val offsetSeq = offsetLog.get(0) + assert(offsetSeq.isDefined, "Offset log should have batch 0") + assert(offsetSeq.get.metadataOpt.isDefined, "Offset metadata should be present") + assert(offsetSeq.get.metadataOpt.get.conf.contains( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key), + "ENABLE_STREAMING_SOURCE_EVOLUTION should be in offset metadata") + assert(offsetSeq.get.metadataOpt.get.conf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "true", + "ENABLE_STREAMING_SOURCE_EVOLUTION should be true in offset metadata") + } + + test("enforcement config is restored from checkpoint on query restart") { + // Start query WITHOUT enforcement (unnamed sources) + withSQLConf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false", + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + + LastOptions.clear() + val checkpointLocation = new Path(newMetadataDir) + + val df1 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() // No .name() call + + val q1 = df1.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q1.processAllAvailable() + q1.stop() + + // Verify enforcement=false was persisted + import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqLog + val offsetLog = new OffsetSeqLog(spark, + makeQualifiedPath(checkpointLocation.toString).toString + "/offsets") + val offsetSeq = offsetLog.get(0) + assert(offsetSeq.isDefined) + assert(offsetSeq.get.metadataOpt.get.conf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "false", + "ENABLE_STREAMING_SOURCE_EVOLUTION should be false in checkpoint") + + // Restart with enforcement ENABLED in session config + // Checkpoint should restore enforcement=false, allowing unnamed sources + withSQLConf( + SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", + SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { + + LastOptions.clear() + + val df2 = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() // Still unnamed + + val q2 = df2.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.processAllAvailable() + + // Verify the session config was overridden by checkpoint metadata + assert(!spark.sessionState.conf.enableStreamingSourceEvolution, + "Session config should be restored from checkpoint to false") + + q2.stop() + } + } + } + + test("upgrade from old checkpoint without enforcement config uses default value") { + import org.apache.spark.sql.execution.streaming.checkpointing.{OffsetSeqMetadata, OffsetSeqMetadataV2} + + // Simulate old checkpoint metadata without ENABLE_STREAMING_SOURCE_EVOLUTION + val oldMetadata = OffsetSeqMetadata( + batchWatermarkMs = 0, + batchTimestampMs = 0, + conf = Map( + // Old checkpoint has other configs but not ENABLE_STREAMING_SOURCE_EVOLUTION + SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max" + ) + ) + + // Verify that reading the config returns the default value (false) + val value = OffsetSeqMetadata.readValueOpt(oldMetadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + assert(value.contains("false"), + s"Expected default value 'false' for missing config, but got: $value") + + // Also test with V2 metadata + val oldMetadataV2 = OffsetSeqMetadataV2( + batchWatermarkMs = 0, + batchTimestampMs = 0, + conf = Map( + SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key -> "max" + ) + ) + + val valueV2 = OffsetSeqMetadata.readValueOpt(oldMetadataV2, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + assert(valueV2.contains("false"), + s"Expected default value 'false' for missing config in V2, but got: $valueV2") + } + // ============== // Helper Methods // ============== From 94a1d48f72d7045e14f367f3e45cbe03d1987cbc Mon Sep 17 00:00:00 2001 From: ericm-db Date: Thu, 19 Feb 2026 18:47:15 -0800 Subject: [PATCH 8/8] [SPARK][SQL] Add config mismatch validation and enhance checkpoint persistence This commit improves streaming source evolution by implementing defense in depth: 1. Persist enforcement config in checkpoint metadata - Added ENABLE_STREAMING_SOURCE_EVOLUTION to relevantSQLConfs - Added default value "false" for backward compatibility with upgrades - Config is now stored in offset metadata for consistency 2. Validate config mismatch on query restart - Detects when session config differs from checkpoint config - Throws STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH - Prevents inconsistent query behavior from config drift 3. Add helper methods and documentation - Added nameOpt to StreamingSourceIdentifyingName for cleaner pattern matching - Documented toUserProvided and nameOpt methods - Enhanced documentation for StreamingUtils methods 4. Refactor metadata path logic - Moved getMetadataPath from MicroBatchExecution to StreamingUtils - Better organization and reusability 5. Add comprehensive tests - Test config persistence in offset metadata - Test config mismatch detection on restart - Test backward compatibility with old checkpoints Defense in depth approach: - Fast fail at analysis time (current session config validation) - Fail on checkpoint mismatch at query start (new validation) --- .../resources/error/error-conditions.json | 5 +++ .../sql/errors/QueryExecutionErrors.scala | 10 +++++ .../streaming/runtime/StreamExecution.scala | 14 +++++++ .../test/StreamingSourceEvolutionSuite.scala | 40 ++++++++++--------- 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 24a12248ce725..feb5683e272ec 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -6093,6 +6093,11 @@ "Streaming query evolution error:" ], "subClass" : { + "CONFIG_MISMATCH" : { + "message" : [ + "Configuration mismatch detected when resuming streaming query. The session config for spark.sql.streaming.queryEvolution.enableSourceEvolution is '', but the checkpoint was created with ''. Please set the session config to match the checkpoint value to ensure consistent query behavior." + ] + }, "DUPLICATE_SOURCE_NAMES" : { "message" : [ "Duplicate streaming source names detected: . Each streaming source must have a unique name." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 2ce43cf857a93..7b2405ad01918 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1832,6 +1832,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE "operation" -> operation)) } + def streamingSourceEvolutionConfigMismatchError( + sessionValue: String, + checkpointValue: String): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH", + messageParameters = Map( + "sessionValue" -> sessionValue, + "checkpointValue" -> checkpointValue)) + } + def pythonStreamingDataSourceRuntimeError( action: String, message: String): SparkException = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala index f1604223e256a..037d65c87509f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.classic.{SparkSession, StreamingQuery} import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream} import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.streaming.ContinuousTrigger @@ -341,9 +342,22 @@ abstract class StreamExecution( // Restore relevant SQL configs from checkpoint before evaluating logicalPlan. // This ensures configs like ENABLE_STREAMING_SOURCE_EVOLUTION are set correctly // when the logical plan determines source naming strategy. + val sessionEnforcementBeforeRestore = + sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution offsetLog.getLatest().foreach { case (_, offsetSeq) => offsetSeq.metadataOpt.foreach { metadata => OffsetSeqMetadata.setSessionConf(metadata, sparkSessionForStream.sessionState.conf) + + // Validate that critical configs haven't changed since checkpoint was created. + // This prevents config drift that could lead to incorrect query behavior. + val checkpointEnforcement = + OffsetSeqMetadata.readValueOpt( + metadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION).getOrElse("false") + if (sessionEnforcementBeforeRestore.toString != checkpointEnforcement) { + throw QueryExecutionErrors.streamingSourceEvolutionConfigMismatchError( + sessionValue = sessionEnforcementBeforeRestore.toString, + checkpointValue = checkpointEnforcement) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala index 745666f89e4f0..d4b838ab5ab4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala @@ -24,9 +24,10 @@ import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, Tag} +import org.apache.spark.SparkIllegalArgumentException import org.apache.spark.sql._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.util.Utils @@ -466,7 +467,7 @@ class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { "ENABLE_STREAMING_SOURCE_EVOLUTION should be true in offset metadata") } - test("enforcement config is restored from checkpoint on query restart") { + test("config mismatch detected when restarting with different enforcement mode") { // Start query WITHOUT enforcement (unnamed sources) withSQLConf( SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false", @@ -497,8 +498,7 @@ class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key) == "false", "ENABLE_STREAMING_SOURCE_EVOLUTION should be false in checkpoint") - // Restart with enforcement ENABLED in session config - // Checkpoint should restore enforcement=false, allowing unnamed sources + // Try to restart with enforcement ENABLED - should fail with config mismatch withSQLConf( SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "true", SQLConf.STREAMING_OFFSET_LOG_FORMAT_VERSION.key -> "2") { @@ -507,20 +507,22 @@ class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { val df2 = spark.readStream .format("org.apache.spark.sql.streaming.test") - .load() // Still unnamed - - val q2 = df2.writeStream - .format("org.apache.spark.sql.streaming.test") - .option("checkpointLocation", checkpointLocation.toString) - .trigger(ProcessingTime(10.seconds)) - .start() - q2.processAllAvailable() + .name("source1") // Must be named when enforcement=true + .load() - // Verify the session config was overridden by checkpoint metadata - assert(!spark.sessionState.conf.enableStreamingSourceEvolution, - "Session config should be restored from checkpoint to false") + val e = intercept[StreamingQueryException] { + val q2 = df2.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation.toString) + .trigger(ProcessingTime(10.seconds)) + .start() + q2.awaitTermination() + } - q2.stop() + checkError( + exception = e.getCause.asInstanceOf[SparkIllegalArgumentException], + condition = "STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH", + parameters = Map("sessionValue" -> "true", "checkpointValue" -> "false")) } } } @@ -539,7 +541,8 @@ class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { ) // Verify that reading the config returns the default value (false) - val value = OffsetSeqMetadata.readValueOpt(oldMetadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + val value = OffsetSeqMetadata.readValueOpt( + oldMetadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) assert(value.contains("false"), s"Expected default value 'false' for missing config, but got: $value") @@ -552,7 +555,8 @@ class StreamingSourceEvolutionSuite extends StreamTest with BeforeAndAfterEach { ) ) - val valueV2 = OffsetSeqMetadata.readValueOpt(oldMetadataV2, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) + val valueV2 = OffsetSeqMetadata.readValueOpt( + oldMetadataV2, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION) assert(valueV2.contains("false"), s"Expected default value 'false' for missing config in V2, but got: $valueV2") }