Skip to content

Commit 4c498c3

Browse files
pan3793dongjoon-hyun
authored andcommitted
[SPARK-54112][CONNECT] Support getSchemas for SparkConnectDatabaseMetaData
### What changes were proposed in this pull request? Implement `getSchemas` methods defined in `java.sql.DatabaseMetaData` for `SparkConnectDatabaseMetaData`. ```java /** * Retrieves the schema names available in this database. The results * are ordered by {code TABLE_CATALOG} and * {code TABLE_SCHEM}. * * <P>The schema columns are: * <OL> * <LI><B>TABLE_SCHEM</B> String {code =>} schema name * <LI><B>TABLE_CATALOG</B> String {code =>} catalog name (may be {code null}) * </OL> * * return a {code ResultSet} object in which each row is a * schema description * throws SQLException if a database access error occurs * */ ResultSet getSchemas() throws SQLException; /** * Retrieves the schema names available in this database. The results * are ordered by {code TABLE_CATALOG} and * {code TABLE_SCHEM}. * * <P>The schema columns are: * <OL> * <LI><B>TABLE_SCHEM</B> String {code =>} schema name * <LI><B>TABLE_CATALOG</B> String {code =>} catalog name (may be {code null}) * </OL> * * * param catalog a catalog name; must match the catalog name as it is stored * in the database;"" retrieves those without a catalog; null means catalog * name should not be used to narrow down the search. * param schemaPattern a schema name; must match the schema name as it is * stored in the database; null means * schema name should not be used to narrow down the search. * return a {code ResultSet} object in which each row is a * schema description * throws SQLException if a database access error occurs * see #getSearchStringEscape * since 1.6 */ ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException; ``` ### Why are the changes needed? Enhance API coverage of the Connect JDBC driver, for example, `get[Catalogs|Schemas|Tables|...]` APIs are used by SQL GUI tools such as DBeaver for displaying the tree category. ### Does this PR introduce _any_ user-facing change? No, Connect JDBC driver is a new feature under development. ### How was this patch tested? New UT is added, also tested via DBeaver - the catalog/schema tree works now. <img width="1260" height="892" alt="Xnip2025-11-01_01-33-38" src="https://github.com/user-attachments/assets/ca678627-e07c-430a-9750-e7ea1d69aecf" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52819 from pan3793/SPARK-54112. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 52fe51b) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 9beab99 commit 4c498c3

File tree

3 files changed

+181
-6
lines changed

3 files changed

+181
-6
lines changed

sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ package org.apache.spark.sql.connect.client.jdbc
2020
import java.sql.{Array => _, _}
2121

2222
import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
23+
import org.apache.spark.sql.Column
24+
import org.apache.spark.sql.catalyst.util.QuotingUtils._
25+
import org.apache.spark.sql.connect
2326
import org.apache.spark.sql.connect.client.jdbc.SparkConnectDatabaseMetaData._
27+
import org.apache.spark.sql.functions._
2428
import org.apache.spark.util.VersionUtils
2529

2630
class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends DatabaseMetaData {
@@ -97,8 +101,7 @@ class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends Databas
97101
override def getTimeDateFunctions: String =
98102
throw new SQLFeatureNotSupportedException
99103

100-
override def getSearchStringEscape: String =
101-
throw new SQLFeatureNotSupportedException
104+
override def getSearchStringEscape: String = "\\"
102105

103106
override def getExtraNameCharacters: String = ""
104107

@@ -277,6 +280,9 @@ class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends Databas
277280

278281
override def dataDefinitionIgnoredInTransactions: Boolean = false
279282

283+
private def isNullOrWildcard(pattern: String): Boolean =
284+
pattern == null || pattern == "%"
285+
280286
override def getProcedures(
281287
catalog: String,
282288
schemaPattern: String,
@@ -299,11 +305,60 @@ class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends Databas
299305
new SparkConnectResultSet(df.collectResult())
300306
}
301307

302-
override def getSchemas: ResultSet =
303-
throw new SQLFeatureNotSupportedException
308+
override def getSchemas: ResultSet = {
309+
conn.checkOpen()
304310

305-
override def getSchemas(catalog: String, schemaPattern: String): ResultSet =
306-
throw new SQLFeatureNotSupportedException
311+
getSchemas(null, null)
312+
}
313+
314+
// Schema of the returned DataFrame is:
315+
// |-- TABLE_SCHEM: string (nullable = false)
316+
// |-- TABLE_CATALOG: string (nullable = false)
317+
private def getSchemasDataFrame(
318+
catalog: String, schemaPattern: String): connect.DataFrame = {
319+
320+
val schemaFilterExpr = if (isNullOrWildcard(schemaPattern)) {
321+
lit(true)
322+
} else {
323+
$"TABLE_SCHEM".like(schemaPattern)
324+
}
325+
326+
def internalGetSchemas(
327+
catalogOpt: Option[String],
328+
schemaFilterExpr: Column): connect.DataFrame = {
329+
val catalog = catalogOpt.getOrElse(conn.getCatalog)
330+
// Spark SQL supports LIKE clause in SHOW SCHEMAS command, but we can't use that
331+
// because the LIKE pattern does not follow SQL standard.
332+
conn.spark.sql(s"SHOW SCHEMAS IN ${quoteIdentifier(catalog)}")
333+
.select($"namespace".as("TABLE_SCHEM"))
334+
.filter(schemaFilterExpr)
335+
.withColumn("TABLE_CATALOG", lit(catalog))
336+
}
337+
338+
if (catalog == null) {
339+
// search in all catalogs
340+
val emptyDf = conn.spark.emptyDataFrame
341+
.withColumn("TABLE_SCHEM", lit(""))
342+
.withColumn("TABLE_CATALOG", lit(""))
343+
conn.spark.catalog.listCatalogs().collect().map(_.name).map { c =>
344+
internalGetSchemas(Some(c), schemaFilterExpr)
345+
}.fold(emptyDf) { (l, r) => l.unionAll(r) }
346+
} else if (catalog == "") {
347+
// search only in current catalog
348+
internalGetSchemas(None, schemaFilterExpr)
349+
} else {
350+
// search in the specific catalog
351+
internalGetSchemas(Some(catalog), schemaFilterExpr)
352+
}
353+
}
354+
355+
override def getSchemas(catalog: String, schemaPattern: String): ResultSet = {
356+
conn.checkOpen()
357+
358+
val df = getSchemasDataFrame(catalog, schemaPattern)
359+
.orderBy("TABLE_CATALOG", "TABLE_SCHEM")
360+
new SparkConnectResultSet(df.collectResult())
361+
}
307362

308363
override def getTableTypes: ResultSet =
309364
throw new SQLFeatureNotSupportedException

sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaDataSuite.scala

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark
6969
assert(metadata.storesLowerCaseQuotedIdentifiers === false)
7070
assert(metadata.storesMixedCaseQuotedIdentifiers === false)
7171
assert(metadata.getIdentifierQuoteString === "`")
72+
assert(metadata.getSearchStringEscape === "\\")
7273
assert(metadata.getExtraNameCharacters === "")
7374
assert(metadata.supportsAlterTableWithAddColumn === true)
7475
assert(metadata.supportsAlterTableWithDropColumn === true)
@@ -235,4 +236,111 @@ class SparkConnectDatabaseMetaDataSuite extends ConnectFunSuite with RemoteSpark
235236
}
236237
}
237238
}
239+
240+
test("SparkConnectDatabaseMetaData getSchemas") {
241+
242+
def verifyGetSchemas(
243+
getSchemas: () => ResultSet)(verify: Seq[(String, String)] => Unit): Unit = {
244+
Using.resource(getSchemas()) { rs =>
245+
val catalogDatabases = new Iterator[(String, String)] {
246+
def hasNext: Boolean = rs.next()
247+
def next(): (String, String) =
248+
(rs.getString("TABLE_CATALOG"), rs.getString("TABLE_SCHEM"))
249+
}.toSeq
250+
verify(catalogDatabases)
251+
}
252+
}
253+
254+
withConnection { conn =>
255+
implicit val spark: SparkSession = conn.asInstanceOf[SparkConnectConnection].spark
256+
257+
registerCatalog("test`cat", TEST_IN_MEMORY_CATALOG)
258+
259+
spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db1")
260+
spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db2")
261+
spark.sql("CREATE DATABASE IF NOT EXISTS `test``cat`.t_db_")
262+
263+
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db1")
264+
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.db2")
265+
spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.test_db3")
266+
267+
val metadata = conn.getMetaData
268+
269+
// no need to care about "test`cat" because it is memory based and session isolated,
270+
// also is inaccessible from another SparkSession
271+
withDatabase("spark_catalog.db1", "spark_catalog.db2", "spark_catalog.test_db3") {
272+
// list schemas in all catalogs
273+
val getSchemasInAllCatalogs = (() => metadata.getSchemas) ::
274+
List(null, "%").map { database => () => metadata.getSchemas(null, database) } ::: Nil
275+
276+
getSchemasInAllCatalogs.foreach { getSchemas =>
277+
verifyGetSchemas(getSchemas) { catalogDatabases =>
278+
// results are ordered by TABLE_CATALOG, TABLE_SCHEM
279+
assert {
280+
catalogDatabases === Seq(
281+
("spark_catalog", "db1"),
282+
("spark_catalog", "db2"),
283+
("spark_catalog", "default"),
284+
("spark_catalog", "test_db3"),
285+
("test`cat", "t_db1"),
286+
("test`cat", "t_db2"),
287+
("test`cat", "t_db_"))
288+
}
289+
}
290+
}
291+
292+
// list schemas in current catalog
293+
assert(conn.getCatalog === "spark_catalog")
294+
val getSchemasInCurrentCatalog =
295+
List(null, "%").map { database => () => metadata.getSchemas("", database) }
296+
getSchemasInCurrentCatalog.foreach { getSchemas =>
297+
verifyGetSchemas(getSchemas) { catalogDatabases =>
298+
// results are ordered by TABLE_CATALOG, TABLE_SCHEM
299+
assert {
300+
catalogDatabases === Seq(
301+
("spark_catalog", "db1"),
302+
("spark_catalog", "db2"),
303+
("spark_catalog", "default"),
304+
("spark_catalog", "test_db3"))
305+
}
306+
}
307+
}
308+
309+
// list schemas with schema pattern
310+
verifyGetSchemas { () => metadata.getSchemas(null, "db%") } { catalogDatabases =>
311+
// results are ordered by TABLE_CATALOG, TABLE_SCHEM
312+
assert {
313+
catalogDatabases === Seq(
314+
("spark_catalog", "db1"),
315+
("spark_catalog", "db2"))
316+
}
317+
}
318+
319+
verifyGetSchemas { () => metadata.getSchemas(null, "db_") } { catalogDatabases =>
320+
// results are ordered by TABLE_CATALOG, TABLE_SCHEM
321+
assert {
322+
catalogDatabases === Seq(
323+
("spark_catalog", "db1"),
324+
("spark_catalog", "db2"))
325+
}
326+
}
327+
328+
// escape backtick in catalog, and _ in schema pattern
329+
verifyGetSchemas {
330+
() => metadata.getSchemas("test`cat", "t\\_db\\_")
331+
} { catalogDatabases =>
332+
assert(catalogDatabases === Seq(("test`cat", "t_db_")))
333+
}
334+
335+
// skip testing escape ', % in schema pattern, because Spark SQL does not
336+
// allow using those chars in schema table name.
337+
//
338+
// CREATE DATABASE IF NOT EXISTS `t_db1'`;
339+
//
340+
// the above SQL fails with error condition:
341+
// [INVALID_SCHEMA_OR_RELATION_NAME] `t_db1'` is not a valid name for tables/schemas.
342+
// Valid names only contain alphabet characters, numbers and _. SQLSTATE: 42602
343+
}
344+
}
345+
}
238346
}

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/SQLHelper.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,16 @@ trait SQLHelper {
142142
spark.sql(s"DROP VIEW IF EXISTS $name")
143143
})
144144
}
145+
146+
/**
147+
* Drops database `dbName` after calling `f`.
148+
*/
149+
protected def withDatabase(dbNames: String*)(f: => Unit): Unit = {
150+
SparkErrorUtils.tryWithSafeFinally(f) {
151+
dbNames.foreach { name =>
152+
spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE")
153+
}
154+
spark.sql(s"USE default")
155+
}
156+
}
145157
}

0 commit comments

Comments
 (0)