Skip to content

Commit fc9d421

Browse files
amaliujiacloud-fan
authored andcommitted
[SPARK-49211][SQL][FOLLOW-UP] Support catalog in QualifiedTableName
### What changes were proposed in this pull request? Support catalog in QualifiedTableName and remove `FullQualifiedTableName`. ### Why are the changes needed? Consolidate and remove duplicate code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #48255 from amaliujia/qualifedtablename. Authored-by: Rui Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 339dd5b commit fc9d421

File tree

10 files changed

+52
-39
lines changed

10 files changed

+52
-39
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -197,41 +197,41 @@ class SessionCatalog(
197197
}
198198
}
199199

200-
private val tableRelationCache: Cache[FullQualifiedTableName, LogicalPlan] = {
200+
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
201201
var builder = CacheBuilder.newBuilder()
202202
.maximumSize(cacheSize)
203203

204204
if (cacheTTL > 0) {
205205
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
206206
}
207207

208-
builder.build[FullQualifiedTableName, LogicalPlan]()
208+
builder.build[QualifiedTableName, LogicalPlan]()
209209
}
210210

211211
/** This method provides a way to get a cached plan. */
212-
def getCachedPlan(t: FullQualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
212+
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
213213
tableRelationCache.get(t, c)
214214
}
215215

216216
/** This method provides a way to get a cached plan if the key exists. */
217-
def getCachedTable(key: FullQualifiedTableName): LogicalPlan = {
217+
def getCachedTable(key: QualifiedTableName): LogicalPlan = {
218218
tableRelationCache.getIfPresent(key)
219219
}
220220

221221
/** This method provides a way to cache a plan. */
222-
def cacheTable(t: FullQualifiedTableName, l: LogicalPlan): Unit = {
222+
def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
223223
tableRelationCache.put(t, l)
224224
}
225225

226226
/** This method provides a way to invalidate a cached plan. */
227-
def invalidateCachedTable(key: FullQualifiedTableName): Unit = {
227+
def invalidateCachedTable(key: QualifiedTableName): Unit = {
228228
tableRelationCache.invalidate(key)
229229
}
230230

231231
/** This method discards any cached table relation plans for the given table identifier. */
232232
def invalidateCachedTable(name: TableIdentifier): Unit = {
233233
val qualified = qualifyIdentifier(name)
234-
invalidateCachedTable(FullQualifiedTableName(
234+
invalidateCachedTable(QualifiedTableName(
235235
qualified.catalog.get, qualified.database.get, qualified.table))
236236
}
237237

@@ -301,7 +301,7 @@ class SessionCatalog(
301301
}
302302
if (cascade && databaseExists(dbName)) {
303303
listTables(dbName).foreach { t =>
304-
invalidateCachedTable(FullQualifiedTableName(SESSION_CATALOG_NAME, dbName, t.table))
304+
invalidateCachedTable(QualifiedTableName(SESSION_CATALOG_NAME, dbName, t.table))
305305
}
306306
}
307307
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
@@ -1183,7 +1183,7 @@ class SessionCatalog(
11831183
def refreshTable(name: TableIdentifier): Unit = synchronized {
11841184
getLocalOrGlobalTempView(name).map(_.refresh()).getOrElse {
11851185
val qualifiedIdent = qualifyIdentifier(name)
1186-
val qualifiedTableName = FullQualifiedTableName(
1186+
val qualifiedTableName = QualifiedTableName(
11871187
qualifiedIdent.catalog.get, qualifiedIdent.database.get, qualifiedIdent.table)
11881188
tableRelationCache.invalidate(qualifiedTableName)
11891189
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import org.apache.spark.sql.connector.catalog.CatalogManager
21+
2022
/**
2123
* An identifier that optionally specifies a database.
2224
*
@@ -107,14 +109,25 @@ case class TableIdentifier(table: String, database: Option[String], catalog: Opt
107109
}
108110

109111
/** A fully qualified identifier for a table (i.e., database.tableName) */
110-
case class QualifiedTableName(database: String, name: String) {
111-
override def toString: String = s"$database.$name"
112-
}
112+
case class QualifiedTableName(catalog: String, database: String, name: String) {
113+
/** Two argument ctor for backward compatibility. */
114+
def this(database: String, name: String) = this(
115+
catalog = CatalogManager.SESSION_CATALOG_NAME,
116+
database = database,
117+
name = name)
113118

114-
case class FullQualifiedTableName(catalog: String, database: String, name: String) {
115119
override def toString: String = s"$catalog.$database.$name"
116120
}
117121

122+
object QualifiedTableName {
123+
def apply(catalog: String, database: String, name: String): QualifiedTableName = {
124+
new QualifiedTableName(catalog, database, name)
125+
}
126+
127+
def apply(database: String, name: String): QualifiedTableName =
128+
new QualifiedTableName(database = database, name = name)
129+
}
130+
118131
object TableIdentifier {
119132
def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
120133
def apply(table: String, database: Option[String]): TableIdentifier =

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
2222
import org.scalatest.concurrent.Eventually
2323

2424
import org.apache.spark.sql.AnalysisException
25-
import org.apache.spark.sql.catalyst.{AliasIdentifier, FullQualifiedTableName, FunctionIdentifier, TableIdentifier}
25+
import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, QualifiedTableName, TableIdentifier}
2626
import org.apache.spark.sql.catalyst.analysis._
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -1883,7 +1883,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
18831883
conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)
18841884

18851885
withConfAndEmptyCatalog(conf) { catalog =>
1886-
val table = FullQualifiedTableName(
1886+
val table = QualifiedTableName(
18871887
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "test")
18881888

18891889
// First, make sure the test table is not cached.
@@ -1903,14 +1903,14 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
19031903
test("SPARK-34197: refreshTable should not invalidate the relation cache for temporary views") {
19041904
withBasicCatalog { catalog =>
19051905
createTempView(catalog, "tbl1", Range(1, 10, 1, 10), false)
1906-
val qualifiedName1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "tbl1")
1906+
val qualifiedName1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "tbl1")
19071907
catalog.cacheTable(qualifiedName1, Range(1, 10, 1, 10))
19081908
catalog.refreshTable(TableIdentifier("tbl1"))
19091909
assert(catalog.getCachedTable(qualifiedName1) != null)
19101910

19111911
createGlobalTempView(catalog, "tbl2", Range(2, 10, 1, 10), false)
19121912
val qualifiedName2 =
1913-
FullQualifiedTableName(SESSION_CATALOG_NAME, catalog.globalTempDatabase, "tbl2")
1913+
QualifiedTableName(SESSION_CATALOG_NAME, catalog.globalTempDatabase, "tbl2")
19141914
catalog.cacheTable(qualifiedName2, Range(2, 10, 1, 10))
19151915
catalog.refreshTable(TableIdentifier("tbl2", Some(catalog.globalTempDatabase)))
19161916
assert(catalog.getCachedTable(qualifiedName2) != null)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.internal.{Logging, MDC}
2828
import org.apache.spark.internal.LogKeys.PREDICATES
2929
import org.apache.spark.rdd.RDD
3030
import org.apache.spark.sql._
31-
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, FullQualifiedTableName, InternalRow, SQLConfHelper}
31+
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow, QualifiedTableName, SQLConfHelper}
3232
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
3333
import org.apache.spark.sql.catalyst.analysis._
3434
import org.apache.spark.sql.catalyst.catalog._
@@ -249,7 +249,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
249249
private def readDataSourceTable(
250250
table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = {
251251
val qualifiedTableName =
252-
FullQualifiedTableName(table.identifier.catalog.get, table.database, table.identifier.table)
252+
QualifiedTableName(table.identifier.catalog.get, table.database, table.identifier.table)
253253
val catalog = sparkSession.sessionState.catalog
254254
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table)
255255
catalog.getCachedPlan(qualifiedTableName, () => {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable
2424
import scala.jdk.CollectionConverters._
2525

2626
import org.apache.spark.SparkUnsupportedOperationException
27-
import org.apache.spark.sql.catalyst.{FullQualifiedTableName, FunctionIdentifier, SQLConfHelper, TableIdentifier}
27+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, SQLConfHelper, TableIdentifier}
2828
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
2929
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, SessionCatalog}
3030
import org.apache.spark.sql.catalyst.util.TypeUtils._
@@ -93,7 +93,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
9393
// table here. To avoid breaking it we do not resolve the table provider and still return
9494
// `V1Table` if the custom session catalog is present.
9595
if (table.provider.isDefined && !hasCustomSessionCatalog) {
96-
val qualifiedTableName = FullQualifiedTableName(
96+
val qualifiedTableName = QualifiedTableName(
9797
table.identifier.catalog.get, table.database, table.identifier.table)
9898
// Check if the table is in the v1 table cache to skip the v2 table lookup.
9999
if (catalog.getCachedTable(qualifiedTableName) != null) {

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.time.LocalDateTime
2525
import scala.collection.mutable
2626
import scala.util.Random
2727

28-
import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
28+
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable, HiveTableRelation}
3030
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
3131
import org.apache.spark.sql.catalyst.expressions.AttributeMap
@@ -270,7 +270,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
270270

271271
def getTableFromCatalogCache(tableName: String): LogicalPlan = {
272272
val catalog = spark.sessionState.catalog
273-
val qualifiedTableName = FullQualifiedTableName(
273+
val qualifiedTableName = QualifiedTableName(
274274
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, tableName)
275275
catalog.getCachedTable(qualifiedTableName)
276276
}

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
2727

2828
import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException}
2929
import org.apache.spark.sql._
30-
import org.apache.spark.sql.catalyst.{FullQualifiedTableName, InternalRow, TableIdentifier}
30+
import org.apache.spark.sql.catalyst.{InternalRow, QualifiedTableName, TableIdentifier}
3131
import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
3232
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchNamespaceException, TableAlreadyExistsException}
3333
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
@@ -3713,7 +3713,7 @@ class DataSourceV2SQLSuiteV1Filter
37133713

37143714
// Reset CatalogManager to clear the materialized `spark_catalog` instance, so that we can
37153715
// configure a new implementation.
3716-
val table1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
3716+
val table1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
37173717
spark.sessionState.catalogManager.reset()
37183718
withSQLConf(
37193719
V2_SESSION_CATALOG_IMPLEMENTATION.key ->
@@ -3722,7 +3722,7 @@ class DataSourceV2SQLSuiteV1Filter
37223722
checkParquet(table1.toString, path.getAbsolutePath)
37233723
}
37243724
}
3725-
val table2 = FullQualifiedTableName("testcat3", "default", "t")
3725+
val table2 = QualifiedTableName("testcat3", "default", "t")
37263726
withSQLConf(
37273727
"spark.sql.catalog.testcat3" -> classOf[V2CatalogSupportBuiltinDataSource].getName) {
37283728
withTempPath { path =>
@@ -3741,7 +3741,7 @@ class DataSourceV2SQLSuiteV1Filter
37413741
// Reset CatalogManager to clear the materialized `spark_catalog` instance, so that we can
37423742
// configure a new implementation.
37433743
spark.sessionState.catalogManager.reset()
3744-
val table1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
3744+
val table1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
37453745
withSQLConf(
37463746
V2_SESSION_CATALOG_IMPLEMENTATION.key ->
37473747
classOf[V2CatalogSupportBuiltinDataSource].getName) {
@@ -3750,7 +3750,7 @@ class DataSourceV2SQLSuiteV1Filter
37503750
}
37513751
}
37523752

3753-
val table2 = FullQualifiedTableName("testcat3", "default", "t")
3753+
val table2 = QualifiedTableName("testcat3", "default", "t")
37543754
withSQLConf(
37553755
"spark.sql.catalog.testcat3" -> classOf[V2CatalogSupportBuiltinDataSource].getName) {
37563756
withTempPath { path =>

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclStatus}
2727
import org.apache.spark.{SparkClassNotFoundException, SparkException, SparkFiles, SparkRuntimeException}
2828
import org.apache.spark.internal.config
2929
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
30-
import org.apache.spark.sql.catalyst.{FullQualifiedTableName, FunctionIdentifier, TableIdentifier}
30+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
3131
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
3232
import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -219,7 +219,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
219219
test("SPARK-25403 refresh the table after inserting data") {
220220
withTable("t") {
221221
val catalog = spark.sessionState.catalog
222-
val table = FullQualifiedTableName(
222+
val table = QualifiedTableName(
223223
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
224224
sql("CREATE TABLE t (a INT) USING parquet")
225225
sql("INSERT INTO TABLE t VALUES (1)")
@@ -233,7 +233,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
233233
withTable("t") {
234234
withTempDir { dir =>
235235
val catalog = spark.sessionState.catalog
236-
val table = FullQualifiedTableName(
236+
val table = QualifiedTableName(
237237
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
238238
val p1 = s"${dir.getCanonicalPath}/p1"
239239
val p2 = s"${dir.getCanonicalPath}/p2"

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
2323
import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, FsAction, FsPermission}
2424

2525
import org.apache.spark.sql.{AnalysisException, Row}
26-
import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
26+
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
2727
import org.apache.spark.sql.connector.catalog.CatalogManager
2828
import org.apache.spark.sql.execution.command
2929
import org.apache.spark.sql.execution.command.FakeLocalFsFileSystem
@@ -148,7 +148,7 @@ trait TruncateTableSuiteBase extends command.TruncateTableSuiteBase {
148148

149149
val catalog = spark.sessionState.catalog
150150
val qualifiedTableName =
151-
FullQualifiedTableName(CatalogManager.SESSION_CATALOG_NAME, "ns", "tbl")
151+
QualifiedTableName(CatalogManager.SESSION_CATALOG_NAME, "ns", "tbl")
152152
val cachedPlan = catalog.getCachedTable(qualifiedTableName)
153153
assert(cachedPlan.stats.sizeInBytes == 0)
154154
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.SparkException
2828
import org.apache.spark.internal.{Logging, MDC}
2929
import org.apache.spark.internal.LogKeys._
3030
import org.apache.spark.sql.{AnalysisException, SparkSession}
31-
import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
31+
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
3232
import org.apache.spark.sql.catalyst.catalog._
3333
import org.apache.spark.sql.catalyst.plans.logical._
3434
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -56,7 +56,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
5656
private val tableCreationLocks = Striped.lazyWeakLock(100)
5757

5858
/** Acquires a lock on the table cache for the duration of `f`. */
59-
private def withTableCreationLock[A](tableName: FullQualifiedTableName, f: => A): A = {
59+
private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = {
6060
val lock = tableCreationLocks.get(tableName)
6161
lock.lock()
6262
try f finally {
@@ -66,7 +66,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
6666

6767
// For testing only
6868
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
69-
val key = FullQualifiedTableName(
69+
val key = QualifiedTableName(
7070
// scalastyle:off caselocale
7171
table.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME).toLowerCase,
7272
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
@@ -76,7 +76,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
7676
}
7777

7878
private def getCached(
79-
tableIdentifier: FullQualifiedTableName,
79+
tableIdentifier: QualifiedTableName,
8080
pathsInMetastore: Seq[Path],
8181
schemaInMetastore: StructType,
8282
expectedFileFormat: Class[_ <: FileFormat],
@@ -120,7 +120,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
120120
}
121121

122122
private def logWarningUnexpectedFileFormat(
123-
tableIdentifier: FullQualifiedTableName,
123+
tableIdentifier: QualifiedTableName,
124124
expectedFileFormat: Class[_ <: FileFormat],
125125
actualFileFormat: String): Unit = {
126126
logWarning(log"Table ${MDC(TABLE_NAME, tableIdentifier)} should be stored as " +
@@ -201,7 +201,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
201201
fileType: String,
202202
isWrite: Boolean): LogicalRelation = {
203203
val metastoreSchema = relation.tableMeta.schema
204-
val tableIdentifier = FullQualifiedTableName(relation.tableMeta.identifier.catalog.get,
204+
val tableIdentifier = QualifiedTableName(relation.tableMeta.identifier.catalog.get,
205205
relation.tableMeta.database, relation.tableMeta.identifier.table)
206206

207207
val lazyPruningEnabled = sparkSession.sessionState.conf.manageFilesourcePartitions

0 commit comments

Comments
 (0)