Skip to content

Commit b0ba4a0

Browse files
committed
[SPARK-52320] Add ColumnNotFound/InvalidViewName/TableOrViewAlreadyExists to SparkConnectError
### What changes were proposed in this pull request? This PR aims to add `ColumnNotFound`, `InvalidViewName`, `TableOrViewAlreadyExists` to `SparkConnectError`. ### Why are the changes needed? To provide a user can catch these exceptions easily instead of matching `internalError` with string patterns. ### Does this PR introduce _any_ user-facing change? Yes, but these are more specific exceptions than before. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #179 from dongjoon-hyun/SPARK-52320. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 19b2143 commit b0ba4a0

File tree

8 files changed

+72
-28
lines changed

8 files changed

+72
-28
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ public actor DataFrame: Sendable {
309309
throw SparkConnectError.SchemaNotFound
310310
case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
311311
throw SparkConnectError.TableOrViewNotFound
312+
case let m where m.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"):
313+
throw SparkConnectError.ColumnNotFound
312314
default:
313315
throw error
314316
}

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,20 @@ public actor SparkConnectClient {
127127
),
128128
interceptors: self.intercepters
129129
) { client in
130-
return try await f(client)
130+
do {
131+
return try await f(client)
132+
} catch let error as RPCError where error.code == .internalError {
133+
switch error.message {
134+
case let m where m.contains("TABLE_OR_VIEW_ALREADY_EXISTS"):
135+
throw SparkConnectError.TableOrViewAlreadyExists
136+
case let m where m.contains("TABLE_OR_VIEW_NOT_FOUND"):
137+
throw SparkConnectError.TableOrViewNotFound
138+
case let m where m.contains("Invalid view name:"):
139+
throw SparkConnectError.InvalidViewName
140+
default:
141+
throw error
142+
}
143+
}
131144
}
132145
}
133146

Sources/SparkConnect/SparkConnectError.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
/// A enum for ``SparkConnect`` package errors
2121
public enum SparkConnectError: Error {
2222
case CatalogNotFound
23+
case ColumnNotFound
2324
case InvalidArgument
2425
case InvalidSessionID
2526
case InvalidType
27+
case InvalidViewName
2628
case SchemaNotFound
29+
case TableOrViewAlreadyExists
2730
case TableOrViewNotFound
2831
case UnsupportedOperation
2932
}

Tests/SparkConnectTests/CatalogTests.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,12 @@ struct CatalogTests {
205205
try await spark.range(1).createTempView(viewName)
206206
#expect(try await spark.catalog.tableExists(viewName))
207207

208-
try await #require(throws: Error.self) {
208+
try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
209209
try await spark.range(1).createTempView(viewName)
210210
}
211211
})
212212

213-
try await #require(throws: Error.self) {
213+
try await #require(throws: SparkConnectError.InvalidViewName) {
214214
try await spark.range(1).createTempView("invalid view name")
215215
}
216216

@@ -228,7 +228,7 @@ struct CatalogTests {
228228
try await spark.range(1).createOrReplaceTempView(viewName)
229229
})
230230

231-
try await #require(throws: Error.self) {
231+
try await #require(throws: SparkConnectError.InvalidViewName) {
232232
try await spark.range(1).createOrReplaceTempView("invalid view name")
233233
}
234234

@@ -244,13 +244,13 @@ struct CatalogTests {
244244
try await spark.range(1).createGlobalTempView(viewName)
245245
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)"))
246246

247-
try await #require(throws: Error.self) {
247+
try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
248248
try await spark.range(1).createGlobalTempView(viewName)
249249
}
250250
})
251251
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
252252

253-
try await #require(throws: Error.self) {
253+
try await #require(throws: SparkConnectError.InvalidViewName) {
254254
try await spark.range(1).createGlobalTempView("invalid view name")
255255
}
256256

@@ -269,7 +269,7 @@ struct CatalogTests {
269269
})
270270
#expect(try await spark.catalog.tableExists("global_temp.\(viewName)") == false)
271271

272-
try await #require(throws: Error.self) {
272+
try await #require(throws: SparkConnectError.InvalidViewName) {
273273
try await spark.range(1).createOrReplaceGlobalTempView("invalid view name")
274274
}
275275

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,10 @@ struct DataFrameTests {
235235
@Test
236236
func selectInvalidColumn() async throws {
237237
let spark = try await SparkSession.builder.getOrCreate()
238-
try await #require(throws: Error.self) {
238+
try await #require(throws: SparkConnectError.ColumnNotFound) {
239239
try await spark.range(1).select("invalid").schema
240240
}
241-
try await #require(throws: Error.self) {
241+
try await #require(throws: SparkConnectError.ColumnNotFound) {
242242
try await spark.range(1).select("id + 1").schema
243243
}
244244
await spark.stop()

Tests/SparkConnectTests/DataFrameWriterTests.swift

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ struct DataFrameWriterTests {
112112
try await spark.range(1).write.saveAsTable(tableName)
113113
#expect(try await spark.read.table(tableName).count() == 1)
114114

115-
try await #require(throws: Error.self) {
115+
try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
116116
try await spark.range(1).write.saveAsTable(tableName)
117117
}
118118

@@ -130,8 +130,7 @@ struct DataFrameWriterTests {
130130
let spark = try await SparkSession.builder.getOrCreate()
131131
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
132132
try await SQLHelper.withTable(spark, tableName)({
133-
// Table doesn't exist.
134-
try await #require(throws: Error.self) {
133+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
135134
try await spark.range(1).write.insertInto(tableName)
136135
}
137136

Tests/SparkConnectTests/DataFrameWriterV2Tests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct DataFrameWriterV2Tests {
3434
let write = try await spark.range(2).writeTo(tableName).using("orc")
3535
try await write.create()
3636
#expect(try await spark.table(tableName).count() == 2)
37-
try await #require(throws: Error.self) {
37+
try await #require(throws: SparkConnectError.TableOrViewAlreadyExists) {
3838
try await write.create()
3939
}
4040
})

Tests/SparkConnectTests/MergeIntoWriterTests.swift

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,20 @@ struct MergeIntoWriterTests {
3131
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
3232
try await SQLHelper.withTable(spark, tableName)({
3333
let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
34-
try await #require(throws: Error.self) {
35-
try await mergeInto.whenMatched().delete().merge()
36-
}
37-
try await #require(throws: Error.self) {
38-
try await mergeInto.whenMatched("true").delete().merge()
34+
if await spark.version >= "4.0.0" {
35+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
36+
try await mergeInto.whenMatched().delete().merge()
37+
}
38+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
39+
try await mergeInto.whenMatched("true").delete().merge()
40+
}
41+
} else {
42+
try await #require(throws: Error.self) {
43+
try await mergeInto.whenMatched().delete().merge()
44+
}
45+
try await #require(throws: Error.self) {
46+
try await mergeInto.whenMatched("true").delete().merge()
47+
}
3948
}
4049
})
4150
await spark.stop()
@@ -47,11 +56,20 @@ struct MergeIntoWriterTests {
4756
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
4857
try await SQLHelper.withTable(spark, tableName)({
4958
let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
50-
try await #require(throws: Error.self) {
51-
try await mergeInto.whenNotMatched().insertAll().merge()
52-
}
53-
try await #require(throws: Error.self) {
54-
try await mergeInto.whenNotMatched("true").insertAll().merge()
59+
if await spark.version >= "4.0.0" {
60+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
61+
try await mergeInto.whenNotMatched().insertAll().merge()
62+
}
63+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
64+
try await mergeInto.whenNotMatched("true").insertAll().merge()
65+
}
66+
} else {
67+
try await #require(throws: Error.self) {
68+
try await mergeInto.whenNotMatched().insertAll().merge()
69+
}
70+
try await #require(throws: Error.self) {
71+
try await mergeInto.whenNotMatched("true").insertAll().merge()
72+
}
5573
}
5674
})
5775
await spark.stop()
@@ -63,11 +81,20 @@ struct MergeIntoWriterTests {
6381
let tableName = "TABLE_" + UUID().uuidString.replacingOccurrences(of: "-", with: "")
6482
try await SQLHelper.withTable(spark, tableName)({
6583
let mergeInto = try await spark.range(1).mergeInto(tableName, "true")
66-
try await #require(throws: Error.self) {
67-
try await mergeInto.whenNotMatchedBySource().delete().merge()
68-
}
69-
try await #require(throws: Error.self) {
70-
try await mergeInto.whenNotMatchedBySource("true").delete().merge()
84+
if await spark.version >= "4.0.0" {
85+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
86+
try await mergeInto.whenNotMatchedBySource().delete().merge()
87+
}
88+
try await #require(throws: SparkConnectError.TableOrViewNotFound) {
89+
try await mergeInto.whenNotMatchedBySource("true").delete().merge()
90+
}
91+
} else {
92+
try await #require(throws: Error.self) {
93+
try await mergeInto.whenNotMatchedBySource().delete().merge()
94+
}
95+
try await #require(throws: Error.self) {
96+
try await mergeInto.whenNotMatchedBySource("true").delete().merge()
97+
}
7198
}
7299
})
73100
await spark.stop()

0 commit comments

Comments
 (0)