Skip to content

Commit af7b0c9

Browse files
committed
[SPARK-52317] Identify InvalidTypeException in SparkConnectClient
### What changes were proposed in this pull request? This PR aims to identify `InvalidTypeException` in `SparkConnectClient`. ### Why are the changes needed? To centralize the `InvalidTypeException` handling into a single place. Previously, three actors do the same error handling logic redundantly. - `DataFrame` - `DataFrameReader` - `DataStreamReader` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #176 from dongjoon-hyun/SPARK-52317. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 0dd07eb commit af7b0c9

File tree

4 files changed

+15
-21
lines changed

4 files changed

+15
-21
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -489,13 +489,8 @@ public actor DataFrame: Sendable {
489489
/// - Parameter schema: The given schema.
490490
/// - Returns: A ``DataFrame`` with the given schema.
491491
public func to(_ schema: String) async throws -> DataFrame {
492-
// Validate by parsing.
493-
do {
494-
let dataType = try await sparkSession.client.ddlParse(schema)
495-
return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType))
496-
} catch {
497-
throw SparkConnectError.InvalidTypeException
498-
}
492+
let dataType = try await sparkSession.client.ddlParse(schema)
493+
return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType))
499494
}
500495

501496
/// Returns the content of the Dataset as a Dataset of JSON strings.

Sources/SparkConnect/DataFrameReader.swift

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,7 @@ public actor DataFrameReader: Sendable {
123123
/// - Returns: A ``DataFrameReader``.
124124
@discardableResult
125125
public func schema(_ schema: String) async throws -> DataFrameReader {
126-
// Validate by parsing.
127-
do {
128-
try await sparkSession.client.ddlParse(schema)
129-
} catch {
130-
throw SparkConnectError.InvalidTypeException
131-
}
126+
try await sparkSession.client.ddlParse(schema)
132127
self.userSpecifiedSchemaDDL = schema
133128
return self
134129
}

Sources/SparkConnect/DataStreamReader.swift

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,7 @@ public actor DataStreamReader: Sendable {
5050
/// - Returns: A ``DataStreamReader``.
5151
@discardableResult
5252
public func schema(_ schema: String) async throws -> DataStreamReader {
53-
// Validate by parsing.
54-
do {
55-
try await sparkSession.client.ddlParse(schema)
56-
} catch {
57-
throw SparkConnectError.InvalidTypeException
58-
}
53+
try await sparkSession.client.ddlParse(schema)
5954
self.userSpecifiedSchemaDDL = schema
6055
return self
6156
}

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -781,8 +781,17 @@ public actor SparkConnectClient {
781781
ddlParse.ddlString = ddlString
782782
return OneOf_Analyze.ddlParse(ddlParse)
783783
})
784-
let response = try await service.analyzePlan(request)
785-
return response.ddlParse.parsed
784+
do {
785+
let response = try await service.analyzePlan(request)
786+
return response.ddlParse.parsed
787+
} catch let error as RPCError where error.code == .internalError {
788+
switch error.message {
789+
case let m where m.contains("UNSUPPORTED_DATATYPE") || m.contains("INVALID_IDENTIFIER"):
790+
throw SparkConnectError.InvalidTypeException
791+
default:
792+
throw error
793+
}
794+
}
786795
}
787796
}
788797

0 commit comments

Comments
 (0)