[SPARK-55601][SS] Hook StreamingSourceIdentifyingName into MicrobatchExecution for source naming#54373
Open
ericm-db wants to merge 8 commits intoapache:masterfrom
Open
[SPARK-55601][SS] Hook StreamingSourceIdentifyingName into MicrobatchExecution for source naming#54373ericm-db wants to merge 8 commits intoapache:masterfrom
ericm-db wants to merge 8 commits intoapache:masterfrom
Conversation
Move streamingSourceIdentifyingName from being passed explicitly through StreamingRelation constructors to being derived from DataSource itself via a lazy val, removing the overloaded StreamingRelation.apply and eliminating scattered Unassigned defaults at call sites.
…hs and sourceIdMap - Centralize streamingSourceIdentifyingName in DataSource as a lazy val (UserProvided/FlowAssigned -> named, Unassigned -> positional fallback) - Remove overloaded StreamingRelation.apply; single apply delegates to DataSource.streamingSourceIdentifyingName - Pass userSpecifiedStreamingSourceName through DataSource in ResolveDataSource so all StreamingRelation/V2 call sites use the single-arg apply - In MicroBatchExecution, use MicroBatchExecution.getMetadataPath to derive source metadata paths from sourceIdentifyingName rather than hard-coded positional IDs - When ENABLE_STREAMING_SOURCE_EVOLUTION is on, build sourceIdMap with user-provided names; otherwise fall back to positional indices - Pass sourceIdentifyingName through to StreamingDataSourceV2Relation
… and metadata paths Add tests covering: - Named sources produce named metadata paths (sources/<name>) - Unnamed sources fall back to positional IDs (sources/0, sources/1, ...) - Source evolution: reordering and adding sources with named sources - Named source enforcement requires V2 (OffsetMap) offset log format - Names are preserved correctly through multi-level union operations - Naming sources without enforcement enabled raises an error Also adds BeforeAndAfterEach, verifySourcePath helper (mockito), and updates testWithSourceEvolution to set STREAMING_OFFSET_LOG_FORMAT_VERSION=2.
f5342c3 to
8d3d87c
Compare
…ovided] Change DataSource.userSpecifiedStreamingSourceName from Option[StreamingSourceIdentifyingName] to Option[UserProvided], preventing Unassigned/FlowAssigned from being passed and ensuring DLT flow context fallback always fires when no user-provided name exists. - Add toUserProvided helper to StreamingSourceIdentifyingName - Use sourceIdentifyingName.toUserProvided in ResolveDataSource and DataSourceStrategy - Update tests to pass None instead of Some(Unassigned) This fixes StreamingTableExternalLocationSuite failures where passing Some(Unassigned) short-circuited the DLT flow context fallback that assigns FlowAssigned names from checkpoints.
...re/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingQueryEvolutionSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSourceEvolutionSuite.scala
Show resolved
Hide resolved
…data path logic This commit addresses several improvements to streaming source evolution: 1. Persist ENABLE_STREAMING_SOURCE_EVOLUTION config in offset metadata - Added to relevantSQLConfs so it's automatically persisted - Ensures enforcement mode stays consistent across query restarts 2. Restore configs from checkpoint before logicalPlan evaluation - Moved config restoration to StreamExecution before logicalPlan is forced - Prevents reading stale session config when resuming queries 3. Refactor metadata path computation - Moved getMetadataPath from MicroBatchExecution to StreamingUtils - Added nameOpt helper to StreamingSourceIdentifyingName - Simplified pattern matching using the new helper 4. Rename test suite for clarity - StreamingQueryEvolutionSuite → StreamingSourceEvolutionSuite
.../src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingSourceIdentifyingName.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/OffsetSeq.scala
Show resolved
Hide resolved
…rsistence This commit improves streaming source evolution by implementing defense in depth: 1. Persist enforcement config in checkpoint metadata - Added ENABLE_STREAMING_SOURCE_EVOLUTION to relevantSQLConfs - Added default value "false" for backward compatibility with upgrades - Config is now stored in offset metadata for consistency 2. Validate config mismatch on query restart - Detects when session config differs from checkpoint config - Throws STREAMING_QUERY_EVOLUTION_ERROR.CONFIG_MISMATCH - Prevents inconsistent query behavior from config drift 3. Add helper methods and documentation - Added nameOpt to StreamingSourceIdentifyingName for cleaner pattern matching - Documented toUserProvided and nameOpt methods - Enhanced documentation for StreamingUtils methods 4. Refactor metadata path logic - Moved getMetadataPath from MicroBatchExecution to StreamingUtils - Better organization and reusability 5. Add comprehensive tests - Test config persistence in offset metadata - Test config mismatch detection on restart - Test backward compatibility with old checkpoints Defense in depth approach: - Fast fail at analysis time (current session config validation) - Fail on checkpoint mismatch at query start (new validation)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Move streamingSourceIdentifyingName from being passed explicitly through StreamingRelation constructors to being derived from DataSource itself via a lazy val, removing the overloaded StreamingRelation.apply and eliminating scattered Unassigned defaults at call sites.
Pull the names from StreamingRelation and StreamingRelationV2 in MicroBatchExecution to key the OffsetMap by the source names during query execution
Why are the changes needed?
This is the last and final piece to allow for source naming
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit tests
Was this patch authored or co-authored using generative AI tooling?
No