Skip to content

Commit 49a7739

Browse files
committed
[SPARK-51793] Support ddlParse and jsonToDdl in SparkConnectClient
### What changes were proposed in this pull request? This PR aims to support `ddlParse` and `jsonToDdl` in `SparkConnectClient`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No, these will be used internally. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #57 from dongjoon-hyun/SPARK-51793. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 6687d72 commit 49a7739

File tree

2 files changed

+62
-0
lines changed

2 files changed

+62
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,4 +438,46 @@ public actor SparkConnectClient {
438438
public func clearTags() {
439439
tags.removeAll()
440440
}
441+
442+
/// Parse a DDL string to ``Spark_Connect_DataType`` instance.
443+
/// - Parameter ddlString: A string to parse.
444+
/// - Returns: A ``Spark_Connect_DataType`` instance.
445+
func ddlParse(_ ddlString: String) async throws -> Spark_Connect_DataType {
446+
try await withGRPCClient(
447+
transport: .http2NIOPosix(
448+
target: .dns(host: self.host, port: self.port),
449+
transportSecurity: .plaintext
450+
)
451+
) { client in
452+
let service = SparkConnectService.Client(wrapping: client)
453+
let request = analyze(self.sessionID!, {
454+
var ddlParse = AnalyzePlanRequest.DDLParse()
455+
ddlParse.ddlString = ddlString
456+
return OneOf_Analyze.ddlParse(ddlParse)
457+
})
458+
let response = try await service.analyzePlan(request)
459+
return response.ddlParse.parsed
460+
}
461+
}
462+
463+
/// Convert an JSON string to a DDL string.
464+
/// - Parameter jsonString: A JSON string.
465+
/// - Returns: A DDL string.
466+
func jsonToDdl(_ jsonString: String) async throws -> String {
467+
try await withGRPCClient(
468+
transport: .http2NIOPosix(
469+
target: .dns(host: self.host, port: self.port),
470+
transportSecurity: .plaintext
471+
)
472+
) { client in
473+
let service = SparkConnectService.Client(wrapping: client)
474+
let request = analyze(self.sessionID!, {
475+
var jsonToDDL = AnalyzePlanRequest.JsonToDDL()
476+
jsonToDDL.jsonString = jsonString
477+
return OneOf_Analyze.jsonToDdl(jsonToDDL)
478+
})
479+
let response = try await service.analyzePlan(request)
480+
return response.jsonToDdl.ddlString
481+
}
482+
}
441483
}

Tests/SparkConnectTests/SparkConnectClientTests.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,24 @@ struct SparkConnectClientTests {
6262
#expect(await client.getExecutePlanRequest(plan).tags.isEmpty)
6363
await client.stop()
6464
}
65+
66+
@Test
67+
func ddlParse() async throws {
68+
let client = SparkConnectClient(remote: "sc://localhost", user: "test")
69+
let _ = try await client.connect(UUID().uuidString)
70+
#expect(try await client.ddlParse("a int").simpleString == "struct<a:int>")
71+
await client.stop()
72+
}
73+
74+
#if !os(Linux) // TODO: Enable this with the offical Spark 4 docker image
75+
@Test
76+
func jsonToDdl() async throws {
77+
let client = SparkConnectClient(remote: "sc://localhost", user: "test")
78+
let _ = try await client.connect(UUID().uuidString)
79+
let json =
80+
#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}"#
81+
#expect(try await client.jsonToDdl(json) == "id BIGINT NOT NULL")
82+
await client.stop()
83+
}
84+
#endif
6585
}

0 commit comments

Comments
 (0)