Skip to content

Commit 19b2143

Browse files
committed
[SPARK-52319] Add (Catalog|Schema|TableOrView)NotFound to SparkConnectError
### What changes were proposed in this pull request? This PR aims to add `(Catalog|Schema|TableOrView)NotFound` to `SparkConnectError`. ### Why are the changes needed? To provide a user can catch these exceptions easily instead of matching `internalError` with string patterns. ### Does this PR introduce _any_ user-facing change? Yes, but these are more specific exceptions than before. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #178 from dongjoon-hyun/SPARK-52319. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f715923 commit 19b2143

File tree

3 files changed

+31
-9
lines changed

3 files changed

+31
-9
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,20 @@ public actor DataFrame: Sendable {
299299
),
300300
interceptors: spark.client.getIntercepters()
301301
) { client in
302-
return try await f(client)
302+
do {
303+
return try await f(client)
304+
} catch let error as RPCError where error.code == .internalError {
305+
switch error.message {
306+
case let m where m.contains("CATALOG_NOT_FOUND"):
307+
throw SparkConnectError.CatalogNotFound
308+
case let m where m.contains("SCHEMA_NOT_FOUND"):
309+
throw SparkConnectError.SchemaNotFound
310+
case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
311+
throw SparkConnectError.TableOrViewNotFound
312+
default:
313+
throw error
314+
}
315+
}
303316
}
304317
}
305318

Sources/SparkConnect/SparkConnectError.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
/// A enum for ``SparkConnect`` package errors
2121
public enum SparkConnectError: Error {
22+
case CatalogNotFound
2223
case InvalidArgument
2324
case InvalidSessionID
2425
case InvalidType
26+
case SchemaNotFound
27+
case TableOrViewNotFound
2528
case UnsupportedOperation
2629
}

Tests/SparkConnectTests/CatalogTests.swift

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,14 @@ struct CatalogTests {
3737
func setCurrentCatalog() async throws {
3838
let spark = try await SparkSession.builder.getOrCreate()
3939
try await spark.catalog.setCurrentCatalog("spark_catalog")
40-
try await #require(throws: Error.self) {
41-
try await spark.catalog.setCurrentCatalog("not_exist_catalog")
40+
if await spark.version >= "4.0.0" {
41+
try await #require(throws: SparkConnectError.CatalogNotFound) {
42+
try await spark.catalog.setCurrentCatalog("not_exist_catalog")
43+
}
44+
} else {
45+
try await #require(throws: Error.self) {
46+
try await spark.catalog.setCurrentCatalog("not_exist_catalog")
47+
}
4248
}
4349
await spark.stop()
4450
}
@@ -63,7 +69,7 @@ struct CatalogTests {
6369
func setCurrentDatabase() async throws {
6470
let spark = try await SparkSession.builder.getOrCreate()
6571
try await spark.catalog.setCurrentDatabase("default")
66-
try await #require(throws: Error.self) {
72+
try await #require(throws: SparkConnectError.SchemaNotFound) {
6773
try await spark.catalog.setCurrentDatabase("not_exist_database")
6874
}
6975
await spark.stop()
@@ -91,7 +97,7 @@ struct CatalogTests {
9197
#expect(db.catalog == "spark_catalog")
9298
#expect(db.description == "default database")
9399
#expect(db.locationUri.hasSuffix("spark-warehouse"))
94-
try await #require(throws: Error.self) {
100+
try await #require(throws: SparkConnectError.SchemaNotFound) {
95101
try await spark.catalog.getDatabase("not_exist_database")
96102
}
97103
await spark.stop()
@@ -313,7 +319,7 @@ struct CatalogTests {
313319
try await spark.catalog.cacheTable(tableName, StorageLevel.MEMORY_ONLY)
314320
})
315321

316-
try await #require(throws: Error.self) {
322+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
317323
try await spark.catalog.cacheTable("not_exist_table")
318324
}
319325
await spark.stop()
@@ -330,7 +336,7 @@ struct CatalogTests {
330336
#expect(try await spark.catalog.isCached(tableName))
331337
})
332338

333-
try await #require(throws: Error.self) {
339+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
334340
try await spark.catalog.isCached("not_exist_table")
335341
}
336342
await spark.stop()
@@ -351,7 +357,7 @@ struct CatalogTests {
351357
#expect(try await spark.catalog.isCached(tableName))
352358
})
353359

354-
try await #require(throws: Error.self) {
360+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
355361
try await spark.catalog.refreshTable("not_exist_table")
356362
}
357363
await spark.stop()
@@ -386,7 +392,7 @@ struct CatalogTests {
386392
#expect(try await spark.catalog.isCached(tableName) == false)
387393
})
388394

389-
try await #require(throws: Error.self) {
395+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
390396
try await spark.catalog.uncacheTable("not_exist_table")
391397
}
392398
await spark.stop()

0 commit comments

Comments
 (0)