Skip to content

Commit 0dca569

Browse files
committed
[SPARK-52742] Support createDataflowGraph
### What changes were proposed in this pull request? This PR aims to support `createDataflowGraph` API in order to support `Declarative Pipelines` (SPARK-51727) of Apache Spark `4.1.0-preview1`. ### Why are the changes needed? To support the new feature incrementally. ### Does this PR introduce _any_ user-facing change? No, this is a new feature. ### How was this patch tested? Manually run `Apache Spark 4.1.0-preview1` RC1 . ``` $ sbin/start-connect-server.sh ``` Run the newly added unit test. <img width="1200" alt="Screenshot 2025-07-09 at 19 18 22" src="https://github.com/user-attachments/assets/3084d453-2fc9-45de-873d-0385f84937d6" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #208 from dongjoon-hyun/SPARK-52742. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 20113d8 commit 0dca569

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,37 @@ public actor SparkConnectClient {
11831183
return try await execute(self.sessionID!, command)
11841184
}
11851185

1186+
@discardableResult
1187+
func createDataflowGraph(
1188+
_ defaultCatalog: String? = nil,
1189+
_ defaultDatabase: String? = nil,
1190+
_ sqlConf: [String: String]? = nil
1191+
) async throws -> String {
1192+
try await withGPRC { client in
1193+
var graph = Spark_Connect_PipelineCommand.CreateDataflowGraph()
1194+
if let defaultCatalog {
1195+
graph.defaultCatalog = defaultCatalog
1196+
}
1197+
if let defaultDatabase {
1198+
graph.defaultDatabase = defaultDatabase
1199+
}
1200+
if let sqlConf {
1201+
graph.sqlConf = sqlConf
1202+
}
1203+
1204+
var pipelineCommand = Spark_Connect_PipelineCommand()
1205+
pipelineCommand.commandType = .createDataflowGraph(graph)
1206+
1207+
var command = Spark_Connect_Command()
1208+
command.commandType = .pipelineCommand(pipelineCommand)
1209+
1210+
let response = try await execute(self.sessionID!, command)
1211+
let result = response.first!.pipelineCommandResult.createDataflowGraphResult
1212+
1213+
return result.dataflowGraphID
1214+
}
1215+
}
1216+
11861217
private enum URIParams {
11871218
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
11881219
static let PARAM_SESSION_ID = "session_id"

Tests/SparkConnectTests/SparkConnectClientTests.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,15 @@ struct SparkConnectClientTests {
9696
}
9797
await client.stop()
9898
}
99+
100+
@Test
101+
func createDataflowGraph() async throws {
102+
let client = SparkConnectClient(remote: TEST_REMOTE)
103+
let response = try await client.connect(UUID().uuidString)
104+
if response.sparkVersion.version.starts(with: "4.1") {
105+
let dataflowGraphID = try await client.createDataflowGraph()
106+
#expect(UUID(uuidString: dataflowGraphID) != nil)
107+
}
108+
await client.stop()
109+
}
99110
}

0 commit comments

Comments
 (0)