Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,8 @@ public actor DataFrame: Sendable {
/// - Parameter schema: The given schema.
/// - Returns: A ``DataFrame`` with the given schema.
public func to(_ schema: String) async throws -> DataFrame {
// Validate by parsing.
do {
let dataType = try await sparkSession.client.ddlParse(schema)
return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType))
} catch {
throw SparkConnectError.InvalidTypeException
}
let dataType = try await sparkSession.client.ddlParse(schema)
return DataFrame(spark: self.spark, plan: SparkConnectClient.getToSchema(self.plan.root, dataType))
}

/// Returns the content of the Dataset as a Dataset of JSON strings.
Expand Down
7 changes: 1 addition & 6 deletions Sources/SparkConnect/DataFrameReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,7 @@ public actor DataFrameReader: Sendable {
/// - Returns: A ``DataFrameReader``.
@discardableResult
public func schema(_ schema: String) async throws -> DataFrameReader {
// Validate by parsing.
do {
try await sparkSession.client.ddlParse(schema)
} catch {
throw SparkConnectError.InvalidTypeException
}
try await sparkSession.client.ddlParse(schema)
self.userSpecifiedSchemaDDL = schema
return self
}
Expand Down
7 changes: 1 addition & 6 deletions Sources/SparkConnect/DataStreamReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ public actor DataStreamReader: Sendable {
/// - Returns: A ``DataStreamReader``.
@discardableResult
public func schema(_ schema: String) async throws -> DataStreamReader {
// Validate by parsing.
do {
try await sparkSession.client.ddlParse(schema)
} catch {
throw SparkConnectError.InvalidTypeException
}
try await sparkSession.client.ddlParse(schema)
self.userSpecifiedSchemaDDL = schema
return self
}
Expand Down
13 changes: 11 additions & 2 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -781,8 +781,17 @@ public actor SparkConnectClient {
ddlParse.ddlString = ddlString
return OneOf_Analyze.ddlParse(ddlParse)
})
let response = try await service.analyzePlan(request)
return response.ddlParse.parsed
do {
let response = try await service.analyzePlan(request)
return response.ddlParse.parsed
} catch let error as RPCError where error.code == .internalError {
switch error.message {
case let m where m.contains("UNSUPPORTED_DATATYPE") || m.contains("INVALID_IDENTIFIER"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is INVALID_IDENTIFIER only for invalid type? It sounds like also for any identifier that could be others?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. This is only for ddlParse. Okay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm currently mapping the internal error to SparkConnectError by finding the details.

I can make INVALID_IDENTIFIER to the official one of SparkConnectError when I collect more instances.

throw SparkConnectError.InvalidTypeException
default:
throw error
}
}
}
}

Expand Down
Loading