Skip to content

Commit 5da9d2a

Browse files
ericm-dbanishshri-db
authored andcommitted
[SPARK-54867][SS] Introduce NamedStreamingRelation wrapper for source identification during analysis
### What changes were proposed in this pull request? This PR introduces infrastructure for tracking and propagating source identifying names through query analysis for streaming queries. It adds: 1. **StreamingSourceIdentifyingName** - A sealed trait hierarchy representing the naming state of streaming sources: - `UserProvided(name)` - Explicitly set via `.name()` API - `FlowAssigned(name)` - Assigned by external flow systems (e.g., SDP) - `Unassigned` - No name assigned yet (to be auto-generated) 2. **NamedStreamingRelation** - A transparent wrapper node that: - Carries source identifying names through the analyzer phase - Extends `UnaryNode` for transparent interaction with analyzer rules - Stays unresolved until explicitly unwrapped by a future `NameStreamingSources` analyzer rule - Provides `withUserProvidedName()` to attach user-specified names 3. **NAMED_STREAMING_RELATION** tree pattern for efficient pattern matching ### Why are the changes needed? Streaming sources need stable, predictable names for: - **Checkpoint location stability** - Schema evolution and offset tracking require consistent source identification - **Schema lookup at specific offsets** - Analysis-time operations need to reference sources by name - **Flow integration** - SDP and similar systems need per-source metadata paths By introducing this wrapper during analysis (rather than at execution planning), we enable these capabilities while maintaining a clean separation between parsing, analysis, and execution phases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests in `NamedStreamingRelationSuite` covering: - Source name state transitions (Unassigned → UserProvided) - Output delegation to child plan - Tree pattern registration - Resolved state behavior - String representation ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53639 from ericm-db/named-streaming-relation. Authored-by: ericm-db <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent a309a05 commit 5da9d2a

File tree

4 files changed

+257
-0
lines changed

4 files changed

+257
-0
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
22+
import org.apache.spark.sql.catalyst.streaming.{StreamingSourceIdentifyingName, UserProvided}
23+
import org.apache.spark.sql.catalyst.trees.TreePattern.{NAMED_STREAMING_RELATION, TreePattern}
24+
25+
/**
26+
* A wrapper for streaming relations that carries a source identifying name through analysis.
27+
*
28+
* This node is introduced during query parsing/resolution and is removed by the
29+
* [[NameStreamingSources]] analyzer rule. It serves to:
30+
* 1. Track user-provided source names from `.name()` API
31+
* 2. Track flow-assigned names from SDP context
32+
* 3. Ensure all sources have names before execution planning
33+
*
34+
* By extending [[UnaryNode]], this wrapper is transparent to analyzer rules - they naturally
35+
* descend into the child plan via `mapChildren`, resolve it, and the wrapper persists with the
36+
* updated child. This eliminates the need for explicit handling in most analyzer rules.
37+
*
38+
* The naming happens in the analyzer (before execution) to enable:
39+
* - Schema lookup at specific offsets during analysis
40+
* - Stable checkpoint locations for source evolution
41+
* - SDP flow integration with per-source metadata paths
42+
*
43+
* @param child The underlying streaming relation (UnresolvedDataSource, etc.)
44+
* @param sourceIdentifyingName The source identifying name (UserProvided, FlowAssigned,
45+
* or Unassigned)
46+
*/
47+
case class NamedStreamingRelation(
48+
child: LogicalPlan,
49+
sourceIdentifyingName: StreamingSourceIdentifyingName)
50+
extends UnaryNode {
51+
52+
override def isStreaming: Boolean = true
53+
54+
// Delegate output to child for transparent wrapping
55+
override def output: Seq[Attribute] = child.output
56+
57+
// Keep unresolved until NameStreamingSources explicitly unwraps this node.
58+
// This ensures the wrapper persists through analysis until we're ready to
59+
// propagate the sourceIdentifyingName to the underlying StreamingRelationV2.
60+
override lazy val resolved: Boolean = false
61+
62+
override protected def withNewChildInternal(newChild: LogicalPlan): NamedStreamingRelation =
63+
copy(child = newChild)
64+
65+
/**
66+
* Attaches a user-provided name from the `.name()` API.
67+
* If nameOpt is None, returns this node unchanged.
68+
*
69+
* @param nameOpt The user-provided source name
70+
* @return A new NamedStreamingRelation with the user name attached
71+
*/
72+
def withUserProvidedName(nameOpt: Option[String]): NamedStreamingRelation = {
73+
nameOpt.map { n =>
74+
copy(sourceIdentifyingName = UserProvided(n))
75+
}.getOrElse {
76+
this
77+
}
78+
}
79+
80+
override val nodePatterns: Seq[TreePattern] = Seq(NAMED_STREAMING_RELATION)
81+
82+
override def toString: String = {
83+
s"NamedStreamingRelation($child, $sourceIdentifyingName)"
84+
}
85+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.streaming
19+
20+
/**
21+
* Represents the identifying name state for a streaming source during query analysis.
22+
*
23+
* Source names can be:
24+
* - User-provided via the `.name()` API
25+
* - Flow-assigned by external systems (e.g., SDP)
26+
* - Unassigned, to be auto-generated during analysis
27+
*/
28+
sealed trait StreamingSourceIdentifyingName {
29+
override def toString: String = this match {
30+
case UserProvided(name) => s"""name="$name""""
31+
case FlowAssigned(name) => s"""name="$name""""
32+
case Unassigned => "name=<Unassigned>"
33+
}
34+
}
35+
36+
/**
37+
* A source name explicitly provided by the user via the `.name()` API.
38+
* Takes highest precedence.
39+
*/
40+
case class UserProvided(name: String) extends StreamingSourceIdentifyingName
41+
42+
/**
43+
* A source name assigned by an external flow system (e.g., SDP).
44+
* Used when the source is part of a managed pipeline.
45+
*/
46+
case class FlowAssigned(name: String) extends StreamingSourceIdentifyingName
47+
48+
/**
49+
* No name has been assigned yet. The analyzer will auto-generate one if needed.
50+
*/
51+
case object Unassigned extends StreamingSourceIdentifyingName

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ object TreePattern extends Enumeration {
179179
val WITH_WINDOW_DEFINITION: Value = Value
180180

181181
// Unresolved Plan patterns (Alphabetically ordered)
182+
val NAMED_STREAMING_RELATION: Value = Value
182183
val PLAN_WITH_UNRESOLVED_IDENTIFIER: Value = Value
183184
val UNRESOLVED_EVENT_TIME_WATERMARK: Value = Value
184185
val UNRESOLVED_HAVING: Value = Value
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
22+
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
23+
import org.apache.spark.sql.catalyst.streaming.{FlowAssigned, Unassigned, UserProvided}
24+
import org.apache.spark.sql.catalyst.trees.TreePattern
25+
import org.apache.spark.sql.types.IntegerType
26+
27+
/**
28+
* Unit tests for the NamedStreamingRelation wrapper node.
29+
*/
30+
class NamedStreamingRelationSuite extends SparkFunSuite {
31+
32+
private def createMockPlan(): LocalRelation = {
33+
LocalRelation(AttributeReference("id", IntegerType)())
34+
}
35+
36+
test("Unassigned sourceIdentifyingName") {
37+
val plan = createMockPlan()
38+
val wrapper = NamedStreamingRelation(plan, Unassigned)
39+
40+
assert(wrapper.child eq plan)
41+
assert(wrapper.sourceIdentifyingName == Unassigned)
42+
}
43+
44+
test("withUserProvidedName sets UserProvided name") {
45+
val plan = createMockPlan()
46+
val wrapper = NamedStreamingRelation(plan, Unassigned)
47+
val named = wrapper.withUserProvidedName(Some("my_source"))
48+
49+
assert(named.sourceIdentifyingName == UserProvided("my_source"))
50+
}
51+
52+
test("withUserProvidedName(None) returns same instance") {
53+
val plan = createMockPlan()
54+
val wrapper = NamedStreamingRelation(plan, Unassigned)
55+
val result = wrapper.withUserProvidedName(None)
56+
57+
assert(result eq wrapper)
58+
}
59+
60+
test("isStreaming returns true") {
61+
val plan = createMockPlan()
62+
val wrapper = NamedStreamingRelation(plan, Unassigned)
63+
64+
assert(wrapper.isStreaming)
65+
}
66+
67+
test("has NAMED_STREAMING_RELATION tree pattern") {
68+
val plan = createMockPlan()
69+
val wrapper = NamedStreamingRelation(plan, Unassigned)
70+
71+
assert(wrapper.nodePatterns.contains(TreePattern.NAMED_STREAMING_RELATION))
72+
}
73+
74+
test("resolved is false until unwrapped") {
75+
val plan = createMockPlan()
76+
val wrapper = NamedStreamingRelation(plan, UserProvided("test"))
77+
78+
// Even with a named child, wrapper stays unresolved
79+
assert(!wrapper.resolved)
80+
}
81+
82+
test("output delegates to child") {
83+
val plan = createMockPlan()
84+
val wrapper = NamedStreamingRelation(plan, Unassigned)
85+
86+
assert(wrapper.output == plan.output)
87+
}
88+
89+
test("pattern matching on sourceIdentifyingName variants") {
90+
val plan = createMockPlan()
91+
92+
val userProvided = NamedStreamingRelation(plan, UserProvided("test"))
93+
val flowAssigned = NamedStreamingRelation(plan, FlowAssigned("0"))
94+
val unassigned = NamedStreamingRelation(plan, Unassigned)
95+
96+
def extractName(wrapper: NamedStreamingRelation): Option[String] = {
97+
wrapper.sourceIdentifyingName match {
98+
case UserProvided(n) => Some(n)
99+
case FlowAssigned(n) => Some(n)
100+
case Unassigned => None
101+
}
102+
}
103+
104+
assert(extractName(userProvided).contains("test"))
105+
assert(extractName(flowAssigned).contains("0"))
106+
assert(extractName(unassigned).isEmpty)
107+
}
108+
109+
test("toString includes sourceIdentifyingName") {
110+
val plan = createMockPlan()
111+
112+
val userNamed = NamedStreamingRelation(plan, UserProvided("my_source"))
113+
val flowNamed = NamedStreamingRelation(plan, FlowAssigned("0"))
114+
val unnamed = NamedStreamingRelation(plan, Unassigned)
115+
116+
assert(userNamed.toString.contains("name=\"my_source\""))
117+
assert(flowNamed.toString.contains("name=\"0\""))
118+
assert(unnamed.toString.contains("name=<Unassigned>"))
119+
}
120+
}

0 commit comments

Comments
 (0)