Skip to content

Commit 6806c8b

Browse files
committed
[SPARK-55024][SQL][FOLLOWUP] Delay namespace length check to v1 identifier creation
### What changes were proposed in this pull request? This is a followup to #53788 which moved the namespace length check from individual command handlers to the `CatalogAndIdentifier` extractor. That approach is too aggressive: users can extend the session catalog via `CatalogExtension` and support multi-part namespaces through v2 APIs. The check should only happen when we actually create v1 identifiers like `TableIdentifier`, not at the shared name-resolution layer. ### Changes - **Remove the namespace check from `CatalogAndIdentifier.unapply`** so it remains a pure name-resolution mechanism, preserving `CatalogExtension` flexibility. - **Tighten `CatalogV2Implicits.IdentifierHelper.asTableIdentifier` and `asFunctionIdentifier`** to require exactly one namespace part and throw `REQUIRES_SINGLE_PART_NAMESPACE` (instead of the less precise `IDENTIFIER_TOO_MANY_NAME_PARTS`). This centralizes the validation. - **Remove the now-redundant `V2SessionCatalog.TableIdentifierHelper`** and use the unified `CatalogV2Implicits` conversion everywhere. - **Simplify `ResolveSessionCatalog` extractors** (`ResolvedV1Identifier`, `ResolvedIdentifierInSessionCatalog`, `ResolvedViewIdentifier`) to delegate to `ident.asTableIdentifier`. - **Fix `SparkSqlParser` temp view creation** to check name length before calling `asTableIdentifier`, so the user always sees `notAllowedToAddDBPrefixForTempViewError` instead of a generic error. ### Why are the changes needed? `CatalogExtension` allows users to extend the built-in session catalog and potentially support multi-part namespaces for v2 operations. The early check in `CatalogAndIdentifier` would block such extensions. The namespace length should only be validated when we actually need to create v1 identifiers (e.g. `TableIdentifier`), which inherently require a single-part namespace. Additionally, this PR unifies the scattered namespace validation into a single point (`CatalogV2Implicits.IdentifierHelper`), reducing code duplication and ensuring consistent `REQUIRES_SINGLE_PART_NAMESPACE` errors. ### Does this PR introduce _any_ user-facing change? Yes. Multi-part namespace identifiers now flow through `CatalogAndIdentifier` without error, allowing `CatalogExtension` implementations to handle them. The error is only thrown when the session catalog actually needs to convert to a v1 identifier. ### How was this patch tested? Existing tests updated: - `LookupCatalogSuite` — removed the early-rejection test, added multi-part namespace cases back. - `V2SessionCatalogSuite` — updated to expect `REQUIRES_SINGLE_PART_NAMESPACE`. - `DataSourceV2SQLSuiteV1Filter` and `DataSourceV2FunctionSuite` — all 313 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Yes. Made with [Cursor](https://cursor.com) Closes #54247 from cloud-fan/follow. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent b604338 commit 6806c8b

File tree

20 files changed

+87
-134
lines changed

20 files changed

+87
-134
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5366,7 +5366,7 @@
53665366
},
53675367
"REQUIRES_SINGLE_PART_NAMESPACE" : {
53685368
"message" : [
5369-
"<sessionCatalog> requires a single-part namespace, but got <namespace>."
5369+
"<sessionCatalog> requires a single-part namespace, but got identifier <identifier>."
53705370
],
53715371
"sqlState" : "42K05"
53725372
},

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ private[sql] object CatalogV2Implicits {
166166
def asMultipartIdentifier: Seq[String] = (ident.namespace :+ ident.name).toImmutableArraySeq
167167

168168
def asTableIdentifier: TableIdentifier = ident.namespace match {
169-
case ns if ns.isEmpty => TableIdentifier(ident.name)
170169
case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
171-
case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
170+
case _ =>
171+
throw QueryCompilationErrors.requiresSinglePartNamespaceError(asMultipartIdentifier)
172172
}
173173

174174
/**
@@ -192,9 +192,9 @@ private[sql] object CatalogV2Implicits {
192192
}
193193

194194
def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
195-
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
196195
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
197-
case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
196+
case _ =>
197+
throw QueryCompilationErrors.requiresSinglePartNamespaceError(asMultipartIdentifier)
198198
}
199199

200200
def toQualifiedNameParts(catalog: CatalogPlugin): Seq[String] = {

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@ package org.apache.spark.sql.connector.catalog
1919

2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql.catalyst.TableIdentifier
22-
import org.apache.spark.sql.errors.QueryCompilationErrors
2322
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
24-
import org.apache.spark.util.ArrayImplicits._
2523

2624
/**
2725
* A trait to encapsulate catalog lookup function and helpful extractors.
@@ -109,29 +107,24 @@ private[sql] trait LookupCatalog extends Logging {
109107

110108
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Identifier)] = {
111109
assert(nameParts.nonEmpty)
112-
val (catalog, ident) = if (nameParts.length == 1) {
113-
(currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head))
110+
if (nameParts.length == 1) {
111+
Some((currentCatalog, Identifier.of(catalogManager.currentNamespace, nameParts.head)))
114112
} else if (nameParts.head.equalsIgnoreCase(globalTempDB)) {
115113
// Conceptually global temp views are in a special reserved catalog. However, the v2 catalog
116114
// API does not support view yet, and we have to use v1 commands to deal with global temp
117115
// views. To simplify the implementation, we put global temp views in a special namespace
118116
// in the session catalog. The special namespace has higher priority during name resolution.
119117
// For example, if the name of a custom catalog is the same with `GLOBAL_TEMP_DATABASE`,
120118
// this custom catalog can't be accessed.
121-
(catalogManager.v2SessionCatalog, nameParts.asIdentifier)
119+
Some((catalogManager.v2SessionCatalog, nameParts.asIdentifier))
122120
} else {
123121
try {
124-
(catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier)
122+
Some((catalogManager.catalog(nameParts.head), nameParts.tail.asIdentifier))
125123
} catch {
126124
case _: CatalogNotFoundException =>
127-
(currentCatalog, nameParts.asIdentifier)
125+
Some((currentCatalog, nameParts.asIdentifier))
128126
}
129127
}
130-
if (CatalogV2Util.isSessionCatalog(catalog) && ident.namespace().length != 1) {
131-
throw QueryCompilationErrors.requiresSinglePartNamespaceError(
132-
ident.namespace().toImmutableArraySeq)
133-
}
134-
Some((catalog, ident))
135128
}
136129
}
137130

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,12 +1542,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
15421542
new TableAlreadyExistsException(ident.asMultipartIdentifier)
15431543
}
15441544

1545-
def requiresSinglePartNamespaceError(namespace: Seq[String]): Throwable = {
1545+
def requiresSinglePartNamespaceError(identifier: Seq[String]): Throwable = {
15461546
new AnalysisException(
15471547
errorClass = "REQUIRES_SINGLE_PART_NAMESPACE",
15481548
messageParameters = Map(
15491549
"sessionCatalog" -> CatalogManager.SESSION_CATALOG_NAME,
1550-
"namespace" -> toSQLId(namespace)))
1550+
"identifier" -> toSQLId(identifier)))
15511551
}
15521552

15531553
def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = {

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,15 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
5757

5858
test("catalog and identifier") {
5959
Seq(
60-
// Session catalog with single-part namespace
6160
("tbl", sessionCatalog, Seq("default"), "tbl"),
6261
("db.tbl", sessionCatalog, Seq("db"), "tbl"),
6362
(s"$globalTempDB.tbl", sessionCatalog, Seq(globalTempDB), "tbl"),
63+
(s"$globalTempDB.ns1.ns2.tbl", sessionCatalog, Seq(globalTempDB, "ns1", "ns2"), "tbl"),
64+
("ns1.ns2.tbl", sessionCatalog, Seq("ns1", "ns2"), "tbl"),
6465
("`db.tbl`", sessionCatalog, Seq("default"), "db.tbl"),
6566
("parquet.`file:/tmp/db.tbl`", sessionCatalog, Seq("parquet"), "file:/tmp/db.tbl"),
6667
("`org.apache.spark.sql.json`.`s3://buck/tmp/abc.json`", sessionCatalog,
6768
Seq("org.apache.spark.sql.json"), "s3://buck/tmp/abc.json"),
68-
// Non-session catalogs (no namespace restriction)
6969
("prod.func", catalogs("prod"), Seq.empty, "func"),
7070
("prod.db.tbl", catalogs("prod"), Seq("db"), "tbl"),
7171
("test.db.tbl", catalogs("test"), Seq("db"), "tbl"),
@@ -79,21 +79,6 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
7979
}
8080
}
8181

82-
test("session catalog requires single-part namespace") {
83-
// Multi-part namespaces are not allowed for session catalog
84-
Seq(
85-
"ns1.ns2.tbl", // two-part namespace
86-
s"$globalTempDB.ns1.ns2.tbl" // three-part namespace
87-
).foreach { sql =>
88-
val e = intercept[org.apache.spark.sql.AnalysisException] {
89-
parseMultipartIdentifier(sql) match {
90-
case CatalogAndIdentifier(_, _) =>
91-
}
92-
}
93-
assert(e.getCondition === "REQUIRES_SINGLE_PART_NAMESPACE")
94-
}
95-
}
96-
9782
test("table identifier") {
9883
Seq(
9984
("tbl", "tbl", None),

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
3737
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3838
import org.apache.spark.sql.internal.connector.V1Function
3939
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructField, StructType}
40-
import org.apache.spark.util.ArrayImplicits._
4140
import org.apache.spark.util.SparkStringUtils
4241

4342
/**
@@ -727,10 +726,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
727726
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
728727
case ResolvedPersistentView(catalog, ident, _) =>
729728
assert(isSessionCatalog(catalog))
730-
assert(ident.namespace().length == 1)
731-
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
729+
Some(ident.asTableIdentifier.copy(catalog = Some(catalog.name)))
732730

733-
case ResolvedTempView(ident, _) => Some(ident.asTableIdentifier)
731+
case ResolvedTempView(ident, _) =>
732+
Some(TableIdentifier(ident.name(), ident.namespace().headOption))
734733

735734
case _ => None
736735
}
@@ -763,24 +762,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
763762
object ResolvedV1Identifier {
764763
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
765764
case ResolvedIdentifier(catalog, ident) if supportsV1Command(catalog) =>
766-
if (ident.namespace().length != 1) {
767-
throw QueryCompilationErrors
768-
.requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq)
769-
}
770-
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
765+
Some(ident.asTableIdentifier.copy(catalog = Some(catalog.name)))
771766
case _ => None
772767
}
773768
}
774769

775770
// Use this object to help match commands that do not have a v2 implementation.
776-
object ResolvedIdentifierInSessionCatalog{
771+
object ResolvedIdentifierInSessionCatalog {
777772
def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
778773
case ResolvedIdentifier(catalog, ident) if isSessionCatalog(catalog) =>
779-
if (ident.namespace().length != 1) {
780-
throw QueryCompilationErrors
781-
.requiresSinglePartNamespaceError(ident.namespace().toImmutableArraySeq)
782-
}
783-
Some(TableIdentifier(ident.name, Some(ident.namespace.head), Some(catalog.name)))
774+
Some(ident.asTableIdentifier.copy(catalog = Some(catalog.name)))
784775
case _ => None
785776
}
786777
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -698,12 +698,12 @@ class SparkSqlAstBuilder extends AstBuilder {
698698
}
699699

700700
withIdentClause(ctx.identifierReference(), Seq(qPlan), (ident, otherPlans) => {
701-
val tableIdentifier = ident.asTableIdentifier
702-
if (tableIdentifier.database.isDefined) {
701+
if (ident.length > 1) {
703702
// Temporary view names should NOT contain database prefix like "database.table"
704703
throw QueryParsingErrors
705-
.notAllowedToAddDBPrefixForTempViewError(tableIdentifier.nameParts, ctx)
704+
.notAllowedToAddDBPrefixForTempViewError(ident, ctx)
706705
}
706+
val tableIdentifier = TableIdentifier(ident.head)
707707

708708
CreateViewCommand(
709709
tableIdentifier,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable
2424
import scala.jdk.CollectionConverters._
2525

2626
import org.apache.spark.SparkUnsupportedOperationException
27-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, SQLConfHelper, TableIdentifier}
27+
import org.apache.spark.sql.catalyst.{QualifiedTableName, SQLConfHelper}
2828
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2929
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
3030
import org.apache.spark.sql.catalyst.util.TypeUtils._
@@ -45,6 +45,7 @@ import org.apache.spark.util.Utils
4545
*/
4646
class V2SessionCatalog(catalog: SessionCatalog)
4747
extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {
48+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
4849
import V2SessionCatalog._
4950

5051
override val defaultNamespace: Array[String] = Array(conf.defaultDatabase)
@@ -367,26 +368,6 @@ class V2SessionCatalog(catalog: SessionCatalog)
367368
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
368369
}
369370

370-
implicit class TableIdentifierHelper(ident: Identifier) {
371-
def asTableIdentifier: TableIdentifier = {
372-
ident.namespace match {
373-
case Array(db) =>
374-
TableIdentifier(ident.name, Some(db))
375-
case other =>
376-
throw QueryCompilationErrors.requiresSinglePartNamespaceError(other.toImmutableArraySeq)
377-
}
378-
}
379-
380-
def asFunctionIdentifier: FunctionIdentifier = {
381-
ident.namespace match {
382-
case Array(db) =>
383-
FunctionIdentifier(ident.name, Some(db))
384-
case other =>
385-
throw QueryCompilationErrors.requiresSinglePartNamespaceError(other.toImmutableArraySeq)
386-
}
387-
}
388-
}
389-
390371
override def namespaceExists(namespace: Array[String]): Boolean = namespace match {
391372
case Array(db) =>
392373
catalog.databaseExists(db)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ class JDBCTableCatalog extends TableCatalog
4040
with FunctionCatalog
4141
with DataTypeErrorsBase
4242
with Logging {
43-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
4443

4544
private var catalogName: String = null
4645
private var options: JDBCOptions = _
@@ -434,7 +433,7 @@ class JDBCTableCatalog extends TableCatalog
434433

435434
override def loadFunction(ident: Identifier): UnboundFunction = {
436435
if (ident.namespace().nonEmpty) {
437-
throw QueryCompilationErrors.noSuchFunctionError(ident.asFunctionIdentifier)
436+
throw new NoSuchFunctionException(ident)
438437
}
439438
functions.get(ident.name()) match {
440439
case Some(func) =>

sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause-legacy.sql.out

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ org.apache.spark.sql.AnalysisException
883883
"errorClass" : "REQUIRES_SINGLE_PART_NAMESPACE",
884884
"sqlState" : "42K05",
885885
"messageParameters" : {
886-
"namespace" : "`a`.`b`",
886+
"identifier" : "`a`.`b`.`c`",
887887
"sessionCatalog" : "spark_catalog"
888888
}
889889
}
@@ -897,7 +897,7 @@ org.apache.spark.sql.AnalysisException
897897
"errorClass" : "REQUIRES_SINGLE_PART_NAMESPACE",
898898
"sqlState" : "42K05",
899899
"messageParameters" : {
900-
"namespace" : "`a`.`b`",
900+
"identifier" : "`a`.`b`.`c`",
901901
"sessionCatalog" : "spark_catalog"
902902
}
903903
}
@@ -911,7 +911,7 @@ org.apache.spark.sql.AnalysisException
911911
"errorClass" : "REQUIRES_SINGLE_PART_NAMESPACE",
912912
"sqlState" : "42K05",
913913
"messageParameters" : {
914-
"namespace" : "`a`.`b`",
914+
"identifier" : "`a`.`b`.`c`",
915915
"sessionCatalog" : "spark_catalog"
916916
}
917917
}
@@ -925,7 +925,7 @@ org.apache.spark.sql.AnalysisException
925925
"errorClass" : "REQUIRES_SINGLE_PART_NAMESPACE",
926926
"sqlState" : "42K05",
927927
"messageParameters" : {
928-
"namespace" : "`a`.`b`",
928+
"identifier" : "`a`.`b`.`c`",
929929
"sessionCatalog" : "spark_catalog"
930930
}
931931
}
@@ -939,7 +939,7 @@ org.apache.spark.sql.AnalysisException
939939
"errorClass" : "REQUIRES_SINGLE_PART_NAMESPACE",
940940
"sqlState" : "42K05",
941941
"messageParameters" : {
942-
"namespace" : "`a`.`b`.`c`",
942+
"identifier" : "`a`.`b`.`c`.`d`",
943943
"sessionCatalog" : "spark_catalog"
944944
}
945945
}
@@ -975,7 +975,7 @@ org.apache.spark.sql.AnalysisException
975975
"errorClass" : "REQUIRES_SINGLE_PART_NAMESPACE",
976976
"sqlState" : "42K05",
977977
"messageParameters" : {
978-
"namespace" : "`a`.`b`.`c`",
978+
"identifier" : "`a`.`b`.`c`.`d`",
979979
"sessionCatalog" : "spark_catalog"
980980
},
981981
"queryContext" : [ {

0 commit comments

Comments
 (0)