Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6093,6 +6093,11 @@
"Streaming query evolution error:"
],
"subClass" : {
"CONFIG_MISMATCH" : {
"message" : [
"Configuration mismatch detected when resuming streaming query. The session config for spark.sql.streaming.queryEvolution.enableSourceEvolution is '<sessionValue>', but the checkpoint was created with '<checkpointValue>'. Please set the session config to match the checkpoint value to ensure consistent query behavior."
]
},
"DUPLICATE_SOURCE_NAMES" : {
"message" : [
"Duplicate streaming source names detected: <names>. Each streaming source must have a unique name."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,32 @@ sealed trait StreamingSourceIdentifyingName {
case FlowAssigned(name) => s"""name="$name""""
case Unassigned => "name=<Unassigned>"
}

/**
* Extracts only user-provided names, filtering out flow-assigned or unassigned sources.
* Used when narrowing to explicitly user-specified names (e.g., when passing to DataSource).
*
* @return Some(UserProvided) if this is a user-provided name, None otherwise
*/
def toUserProvided: Option[UserProvided] = this match {
case up: UserProvided => Some(up)
case _ => None
}

/**
* Extracts the name string from named sources (UserProvided or FlowAssigned).
* Returns None for Unassigned sources.
*
* Useful for pattern matching when both UserProvided and FlowAssigned should be
* treated identically (e.g., when computing metadata paths or building sourceIdMap).
*
* @return Some(name) for named sources, None for Unassigned
*/
def nameOpt: Option[String] = this match {
case UserProvided(name) => Some(name)
case FlowAssigned(name) => Some(name)
case Unassigned => None
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
"operation" -> operation))
}

def streamingSourceEvolutionConfigMismatchError(
sessionValue: String,
checkpointValue: String): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
errorClass = "STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH",
messageParameters = Map(
"sessionValue" -> sessionValue,
"checkpointValue" -> checkpointValue))
}

def pythonStreamingDataSourceRuntimeError(
action: String,
message: String): SparkException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.analysis.NamedStreamingRelation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnresolvedDataSource}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.classic.SparkSession
Expand Down Expand Up @@ -84,10 +84,11 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = optionsWithPath.originalMap)
options = optionsWithPath.originalMap,
userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided)
val v1Relation = ds match {
case _: StreamSourceProvider =>
Some(StreamingRelation(v1DataSource, sourceIdentifyingName))
Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
Expand All @@ -112,16 +113,16 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
StreamingRelationV2(
Some(provider), source, table, dsOptions,
toAttributes(table.columns.asSchema), None, None, v1Relation,
sourceIdentifyingName)
v1DataSource.streamingSourceIdentifyingName)

// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
case _ => StreamingRelation(v1DataSource, sourceIdentifyingName)
case _ => StreamingRelation(v1DataSource)
}

case _ =>
// Code path for data source v1.
StreamingRelation(v1DataSource, sourceIdentifyingName)
StreamingRelation(v1DataSource)
}

case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, paths) =>
Expand All @@ -148,7 +149,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
options = optionsWithPath.originalMap)
val v1Relation = ds match {
case _: StreamSourceProvider =>
Some(StreamingRelation(v1DataSource, Unassigned))
Some(StreamingRelation(v1DataSource))
case _ => None
}
ds match {
Expand All @@ -173,7 +174,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
StreamingRelationV2(
Some(provider), source, table, dsOptions,
toAttributes(table.columns.asSchema), None, None, v1Relation,
Unassigned)
v1DataSource.streamingSourceIdentifyingName)

// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.DataSourceOptions
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.StreamingSourceIdentifyingName
import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, Unassigned, UserProvided}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, TypeUtils}
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.classic.Dataset
Expand Down Expand Up @@ -103,13 +103,16 @@ case class DataSource(
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty,
catalogTable: Option[CatalogTable] = None,
userSpecifiedStreamingSourceName: Option[StreamingSourceIdentifyingName] = None)
userSpecifiedStreamingSourceName: Option[UserProvided] = None)
extends SessionStateHelper with Logging {

case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])

private val conf: SQLConf = getSqlConf(sparkSession)

lazy val streamingSourceIdentifyingName: StreamingSourceIdentifyingName =
userSpecifiedStreamingSourceName.getOrElse(Unassigned)

lazy val providingClass: Class[_] = {
val cls = DataSource.lookupDataSource(className, conf)
// `providingClass` is used for resolving data source relation for catalog tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
userSpecifiedSchema = Some(table.schema),
options = dsOptions,
catalogTable = Some(table),
userSpecifiedStreamingSourceName = Some(sourceIdentifyingName))
userSpecifiedStreamingSourceName = sourceIdentifyingName.toUserProvided)
StreamingRelation(dataSource)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ object OffsetSeqMetadata extends Logging {
STATE_STORE_ROCKSDB_FORMAT_VERSION, STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION,
STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION,
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN, STREAMING_STATE_STORE_ENCODING_FORMAT,
STATE_STORE_ROW_CHECKSUM_ENABLED
STATE_STORE_ROW_CHECKSUM_ENABLED, ENABLE_STREAMING_SOURCE_EVOLUTION
)

/**
Expand Down Expand Up @@ -251,7 +251,8 @@ object OffsetSeqMetadata extends Logging {
PRUNE_FILTERS_CAN_PRUNE_STREAMING_SUBPLAN.key -> "true",
STREAMING_STATE_STORE_ENCODING_FORMAT.key -> "unsaferow",
STATE_STORE_ROW_CHECKSUM_ENABLED.key -> "false",
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1"
STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "1",
ENABLE_STREAMING_SOURCE_EVOLUTION.key -> "false"
)

def readValue[T](metadataLog: OffsetSeqMetadataBase, confKey: ConfigEntry[T]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming.runtime

import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable.{Map => MutableMap}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand All @@ -30,7 +32,7 @@ import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp, FileSourceMetadataAttribute, LocalTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState, FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState, TransformWithStateInPySpark}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, Unassigned, WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.classic.{Dataset, SparkSession}
Expand All @@ -48,6 +50,7 @@ import org.apache.spark.sql.execution.streaming.runtime.AcceptsLatestSeenOffsetH
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
import org.apache.spark.sql.execution.streaming.state.{OfflineStateRepartitionUtils, StateSchemaBroadcast, StateStoreErrors}
import org.apache.spark.sql.execution.streaming.utils.StreamingUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.PartitionOffsetWithIndex
import org.apache.spark.sql.streaming.Trigger
Expand Down Expand Up @@ -168,7 +171,7 @@ class MicroBatchExecution(
assert(queryExecutionThread eq Thread.currentThread,
"logicalPlan must be initialized in QueryExecutionThread " +
s"but the current thread was ${Thread.currentThread}")
var nextSourceId = 0L
val nextSourceId = new AtomicLong(0L)
val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2ScanRelation]()
Expand All @@ -183,15 +186,17 @@ class MicroBatchExecution(
val disabledSources =
Utils.stringToSeq(sparkSession.sessionState.conf.disabledV2StreamingMicroBatchReaders)

val enforceNamed = sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
case streamingRelation @ StreamingRelation(
dataSourceV1, sourceName, output, sourceIdentifyingName) =>
toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val metadataPath = StreamingUtils.getMetadataPath(
sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
val source = dataSourceV1.createSource(metadataPath)
nextSourceId += 1
logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] " +
log"from DataSourceV1 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, sourceName)}' " +
log"[${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dataSourceV1)}]")
Expand All @@ -206,8 +211,8 @@ class MicroBatchExecution(
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
v2ToRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
val metadataPath = StreamingUtils.getMetadataPath(
sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
logInfo(log"Reading table [${MDC(LogKeys.STREAMING_TABLE, table)}] " +
log"from DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
Expand All @@ -224,7 +229,8 @@ class MicroBatchExecution(
trigger match {
case RealTimeTrigger(duration) => Some(duration)
case _ => None
}
},
sourceIdentifyingName
)
StreamingDataSourceV2ScanRelation(relation, scan, output, stream)
})
Expand All @@ -234,10 +240,10 @@ class MicroBatchExecution(
} else {
v2ToExecutionRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
val metadataPath = StreamingUtils.getMetadataPath(
sourceIdentifyingName, nextSourceId, resolvedCheckpointRoot)
val source =
v1.get.asInstanceOf[StreamingRelation].dataSource.createSource(metadataPath)
nextSourceId += 1
logInfo(log"Using Source [${MDC(LogKeys.STREAMING_SOURCE, source)}] from " +
log"DataSourceV2 named '${MDC(LogKeys.STREAMING_DATA_SOURCE_NAME, srcName)}' " +
log"${MDC(LogKeys.STREAMING_DATA_SOURCE_DESCRIPTION, dsStr)}")
Expand All @@ -247,17 +253,37 @@ class MicroBatchExecution(
})
}
}
sources = _logicalPlan.collect {

// Extract sources and their sourceIdentifyingName for sourceIdMap mapping
val sourcesWithNames = _logicalPlan.collect {
// v1 source
case s: StreamingExecutionRelation => s.source
case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName)
// v2 source
case r: StreamingDataSourceV2ScanRelation => r.stream
case r: StreamingDataSourceV2ScanRelation => (r.stream, r.relation.sourceIdentifyingName)
}
sources = sourcesWithNames.map(_._1)

if (enforceNamed) {
// When enforcement is enabled, all sources should be named after validation in analysis.
// This assertion ensures that the validation in NameStreamingSources worked correctly.
assert(sourcesWithNames.forall(s => s._2 != Unassigned),
"All sources should be named at this point - validation should have happened in analysis")

// Create source ID mapping using names (for OffsetMap format)
sourceIdMap = sourcesWithNames.map { case (source, sourceIdentifyingName) =>
sourceIdentifyingName.nameOpt match {
case Some(name) => name -> source
case None =>
throw new IllegalStateException(
"Unassigned sources should not exist when enforcement is enabled")
}
}.toMap
} else {
// When enforcement is disabled, use positional indices (backward compatibility)
sourceIdMap = sources.zipWithIndex.map {
case (source, index) => index.toString -> source
}.toMap
}

// Create source ID mapping for OffsetMap support
sourceIdMap = sources.zipWithIndex.map {
case (source, index) => index.toString -> source
}.toMap

// Inform the source if it is in real time mode
if (trigger.isInstanceOf[RealTimeTrigger]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.classic.{SparkSession, StreamingQuery}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream}
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
Expand Down Expand Up @@ -337,6 +338,29 @@ abstract class StreamExecution(
}

getLatestExecutionContext().updateStatusMessage("Initializing sources")

// Restore relevant SQL configs from checkpoint before evaluating logicalPlan.
// This ensures configs like ENABLE_STREAMING_SOURCE_EVOLUTION are set correctly
// when the logical plan determines source naming strategy.
val sessionEnforcementBeforeRestore =
sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution
offsetLog.getLatest().foreach { case (_, offsetSeq) =>
offsetSeq.metadataOpt.foreach { metadata =>
OffsetSeqMetadata.setSessionConf(metadata, sparkSessionForStream.sessionState.conf)

// Validate that critical configs haven't changed since checkpoint was created.
// This prevents config drift that could lead to incorrect query behavior.
val checkpointEnforcement =
OffsetSeqMetadata.readValueOpt(
metadata, SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION).getOrElse("false")
if (sessionEnforcementBeforeRestore.toString != checkpointEnforcement) {
throw QueryExecutionErrors.streamingSourceEvolutionConfigMismatchError(
sessionValue = sessionEnforcementBeforeRestore.toString,
checkpointValue = checkpointEnforcement)
}
}
}

// force initialization of the logical plan so that the sources can be created
logicalPlan

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,9 @@ import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
// Extract source identifying name from DataSource for stable checkpoints
val sourceIdentifyingName = dataSource.userSpecifiedStreamingSourceName.getOrElse(Unassigned)
StreamingRelation(
dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema),
sourceIdentifyingName)
}

def apply(
dataSource: DataSource,
sourceIdentifyingName: StreamingSourceIdentifyingName): StreamingRelation = {
StreamingRelation(
dataSource, dataSource.sourceInfo.name, toAttributes(dataSource.sourceInfo.schema),
sourceIdentifyingName)
dataSource.streamingSourceIdentifyingName)
}
}

Expand Down
Loading