Skip to content

Commit a39c346

Browse files
committed
[SPARK-51969] Support createTable and (table|function)Exists in Catalog
### What changes were proposed in this pull request? This PR aims to support `createTable`, `tableExists`, and `functionExists` APIs in `Catalog`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #97 from dongjoon-hyun/SPARK-51969. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent cee0edf commit a39c346

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

Sources/SparkConnect/Catalog.swift

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,114 @@ public actor Catalog: Sendable {
200200
return try await self.listDatabases(pattern: dbName).count > 0
201201
}
202202

203+
/// Creates a table from the given path and returns the corresponding ``DataFrame``.
204+
/// - Parameters:
205+
/// - tableName: A qualified or unqualified name that designates a table. If no database
206+
/// identifier is provided, it refers to a table in the current database.
207+
/// - path: A path to load a table.
208+
/// - source: A data source.
209+
/// - description: A table description.
210+
/// - options: A dictionary for table options
211+
/// - Returns: A ``DataFrame``.
212+
public func createTable(
213+
_ tableName: String,
214+
_ path: String? = nil,
215+
source: String? = nil,
216+
description: String? = nil,
217+
options: [String: String]? = nil
218+
) -> DataFrame {
219+
let df = getDataFrame({
220+
var createTable = Spark_Connect_CreateTable()
221+
createTable.tableName = tableName
222+
if let source {
223+
createTable.source = source
224+
}
225+
createTable.description_p = description ?? ""
226+
if let options {
227+
for (k, v) in options {
228+
createTable.options[k] = v
229+
}
230+
}
231+
if let path {
232+
createTable.options["path"] = path
233+
}
234+
var catalog = Spark_Connect_Catalog()
235+
catalog.createTable = createTable
236+
return catalog
237+
})
238+
return df
239+
}
240+
241+
/// Check if the table or view with the specified name exists. This can either be a temporary
242+
/// view or a table/view.
243+
/// - Parameter tableName: a qualified or unqualified name that designates a table/view. It follows the same
244+
/// resolution rule with SQL: search for temp views first then table/views in the current
245+
/// database (namespace).
246+
/// - Returns: Return true if it exists.
247+
public func tableExists(_ tableName: String) async throws -> Bool {
248+
let df = getDataFrame({
249+
var tableExists = Spark_Connect_TableExists()
250+
tableExists.tableName = tableName
251+
var catalog = Spark_Connect_Catalog()
252+
catalog.tableExists = tableExists
253+
return catalog
254+
})
255+
return "true" == (try await df.collect().first!.get(0) as! String)
256+
}
257+
258+
/// Check if the table or view with the specified name exists. This can either be a temporary
259+
/// view or a table/view.
260+
/// - Parameters:
261+
/// - dbName: an unqualified name that designates a database.
262+
/// - tableName: an unqualified name that designates a table.
263+
/// - Returns: Return true if it exists.
264+
public func tableExists(_ dbName: String, _ tableName: String) async throws -> Bool {
265+
let df = getDataFrame({
266+
var tableExists = Spark_Connect_TableExists()
267+
tableExists.tableName = tableName
268+
tableExists.dbName = dbName
269+
var catalog = Spark_Connect_Catalog()
270+
catalog.tableExists = tableExists
271+
return catalog
272+
})
273+
return "true" == (try await df.collect().first!.get(0) as! String)
274+
}
275+
276+
/// Check if the function with the specified name exists. This can either be a temporary function
277+
/// or a function.
278+
/// - Parameter functionName: a qualified or unqualified name that designates a function. It follows the same
279+
/// resolution rule with SQL: search for built-in/temp functions first then functions in the
280+
/// current database (namespace).
281+
/// - Returns: Return true if it exists.
282+
public func functionExists(_ functionName: String) async throws -> Bool {
283+
let df = getDataFrame({
284+
var functionExists = Spark_Connect_FunctionExists()
285+
functionExists.functionName = functionName
286+
var catalog = Spark_Connect_Catalog()
287+
catalog.functionExists = functionExists
288+
return catalog
289+
})
290+
return "true" == (try await df.collect().first!.get(0) as! String)
291+
}
292+
293+
/// Check if the function with the specified name exists in the specified database under the Hive
294+
/// Metastore.
295+
/// - Parameters:
296+
/// - dbName: an unqualified name that designates a database.
297+
/// - functionName: an unqualified name that designates a function.
298+
/// - Returns: Return true if it exists.
299+
public func functionExists(_ dbName: String, _ functionName: String) async throws -> Bool {
300+
let df = getDataFrame({
301+
var functionExists = Spark_Connect_FunctionExists()
302+
functionExists.functionName = functionName
303+
functionExists.dbName = dbName
304+
var catalog = Spark_Connect_Catalog()
305+
catalog.functionExists = functionExists
306+
return catalog
307+
})
308+
return "true" == (try await df.collect().first!.get(0) as! String)
309+
}
310+
203311
/// Caches the specified table in-memory.
204312
/// - Parameters:
205313
/// - tableName: A qualified or unqualified name that designates a table/view.

Tests/SparkConnectTests/CatalogTests.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,50 @@ struct CatalogTests {
110110
#expect(try await spark.catalog.databaseExists(dbName) == false)
111111
await spark.stop()
112112
}
113+
114+
@Test
115+
func createTable() async throws {
116+
let spark = try await SparkSession.builder.getOrCreate()
117+
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
118+
try await SQLHelper.withTable(spark, tableName)({
119+
try await spark.range(1).write.orc("/tmp/\(tableName)")
120+
#expect(try await spark.catalog.createTable(tableName, "/tmp/\(tableName)", source: "orc").count() == 1)
121+
#expect(try await spark.catalog.tableExists(tableName))
122+
})
123+
await spark.stop()
124+
}
125+
126+
@Test
127+
func tableExists() async throws {
128+
let spark = try await SparkSession.builder.getOrCreate()
129+
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
130+
try await SQLHelper.withTable(spark, tableName)({
131+
try await spark.range(1).write.parquet("/tmp/\(tableName)")
132+
#expect(try await spark.catalog.tableExists(tableName) == false)
133+
#expect(try await spark.catalog.createTable(tableName, "/tmp/\(tableName)").count() == 1)
134+
#expect(try await spark.catalog.tableExists(tableName))
135+
#expect(try await spark.catalog.tableExists("default", tableName))
136+
#expect(try await spark.catalog.tableExists("default2", tableName) == false)
137+
})
138+
#expect(try await spark.catalog.tableExists(tableName) == false)
139+
140+
try await #require(throws: Error.self) {
141+
try await spark.catalog.tableExists("invalid table name")
142+
}
143+
await spark.stop()
144+
}
145+
146+
@Test
147+
func functionExists() async throws {
148+
let spark = try await SparkSession.builder.getOrCreate()
149+
#expect(try await spark.catalog.functionExists("base64"))
150+
#expect(try await spark.catalog.functionExists("non_exist_function") == false)
151+
152+
try await #require(throws: Error.self) {
153+
try await spark.catalog.functionExists("invalid function name")
154+
}
155+
await spark.stop()
156+
}
113157
#endif
114158

115159
@Test

0 commit comments

Comments
 (0)