Skip to content

Commit 603f0e4

Browse files
committed
[SPARK-52319] Add (Catalog|Schema|TableOrView)NotFound to SparkConnectError
1 parent f715923 commit 603f0e4

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,20 @@ public actor DataFrame: Sendable {
299299
),
300300
interceptors: spark.client.getIntercepters()
301301
) { client in
302-
return try await f(client)
302+
do {
303+
return try await f(client)
304+
} catch let error as RPCError where error.code == .internalError {
305+
switch error.message {
306+
case let m where m.contains("CATALOG_NOT_FOUND"):
307+
throw SparkConnectError.CatalogNotFound
308+
case let m where m.contains("SCHEMA_NOT_FOUND"):
309+
throw SparkConnectError.SchemaNotFound
310+
case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
311+
throw SparkConnectError.TableOrViewNotFound
312+
default:
313+
throw error
314+
}
315+
}
303316
}
304317
}
305318

Sources/SparkConnect/SparkConnectError.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
/// A enum for ``SparkConnect`` package errors
2121
public enum SparkConnectError: Error {
22+
case CatalogNotFound
2223
case InvalidArgument
2324
case InvalidSessionID
2425
case InvalidType
26+
case SchemaNotFound
27+
case TableOrViewNotFound
2528
case UnsupportedOperation
2629
}

Tests/SparkConnectTests/CatalogTests.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ struct CatalogTests {
3737
func setCurrentCatalog() async throws {
3838
let spark = try await SparkSession.builder.getOrCreate()
3939
try await spark.catalog.setCurrentCatalog("spark_catalog")
40-
try await #require(throws: Error.self) {
40+
try await #require(throws: SparkConnectError.CatalogNotFound) {
4141
try await spark.catalog.setCurrentCatalog("not_exist_catalog")
4242
}
4343
await spark.stop()
@@ -63,7 +63,7 @@ struct CatalogTests {
6363
func setCurrentDatabase() async throws {
6464
let spark = try await SparkSession.builder.getOrCreate()
6565
try await spark.catalog.setCurrentDatabase("default")
66-
try await #require(throws: Error.self) {
66+
try await #require(throws: SparkConnectError.SchemaNotFound) {
6767
try await spark.catalog.setCurrentDatabase("not_exist_database")
6868
}
6969
await spark.stop()
@@ -91,7 +91,7 @@ struct CatalogTests {
9191
#expect(db.catalog == "spark_catalog")
9292
#expect(db.description == "default database")
9393
#expect(db.locationUri.hasSuffix("spark-warehouse"))
94-
try await #require(throws: Error.self) {
94+
try await #require(throws: SparkConnectError.SchemaNotFound) {
9595
try await spark.catalog.getDatabase("not_exist_database")
9696
}
9797
await spark.stop()
@@ -313,7 +313,7 @@ struct CatalogTests {
313313
try await spark.catalog.cacheTable(tableName, StorageLevel.MEMORY_ONLY)
314314
})
315315

316-
try await #require(throws: Error.self) {
316+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
317317
try await spark.catalog.cacheTable("not_exist_table")
318318
}
319319
await spark.stop()
@@ -330,7 +330,7 @@ struct CatalogTests {
330330
#expect(try await spark.catalog.isCached(tableName))
331331
})
332332

333-
try await #require(throws: Error.self) {
333+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
334334
try await spark.catalog.isCached("not_exist_table")
335335
}
336336
await spark.stop()
@@ -351,7 +351,7 @@ struct CatalogTests {
351351
#expect(try await spark.catalog.isCached(tableName))
352352
})
353353

354-
try await #require(throws: Error.self) {
354+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
355355
try await spark.catalog.refreshTable("not_exist_table")
356356
}
357357
await spark.stop()
@@ -386,7 +386,7 @@ struct CatalogTests {
386386
#expect(try await spark.catalog.isCached(tableName) == false)
387387
})
388388

389-
try await #require(throws: Error.self) {
389+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
390390
try await spark.catalog.uncacheTable("not_exist_table")
391391
}
392392
await spark.stop()

0 commit comments

Comments
 (0)