Skip to content

Commit 325c6b4

Browse files
committed
[SPARK-52758] Support defineSqlGraphElements
### What changes were proposed in this pull request? This PR aims to support `defineSqlGraphElements ` API in order to support `Declarative Pipelines` (SPARK-51727) of Apache Spark `4.1.0-preview1`. ### Why are the changes needed? To support the new feature incrementally. ### Does this PR introduce _any_ user-facing change? No, this is a new feature. ### How was this patch tested? Pass the CIs with `4.1.0-preview1` test pipeline. <img width="1447" height="213" alt="Screenshot 2025-07-10 at 10 40 58" src="https://github.com/user-attachments/assets/e3ce1b44-b6b1-45a6-bc34-f457ccbf0615" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #213 from dongjoon-hyun/SPARK-52758. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a7e7feb commit 325c6b4

File tree

2 files changed

+47
-0
lines changed

2 files changed

+47
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,35 @@ public actor SparkConnectClient {
13031303
}
13041304
}
13051305

1306+
@discardableResult
1307+
func defineSqlGraphElements(
1308+
_ dataflowGraphID: String,
1309+
_ sqlFilePath: String,
1310+
_ sqlText: String
1311+
) async throws -> Bool {
1312+
try await withGPRC { client in
1313+
if UUID(uuidString: dataflowGraphID) == nil {
1314+
throw SparkConnectError.InvalidArgument
1315+
}
1316+
1317+
var elements = Spark_Connect_PipelineCommand.DefineSqlGraphElements()
1318+
elements.dataflowGraphID = dataflowGraphID
1319+
elements.sqlFilePath = sqlFilePath
1320+
elements.sqlText = sqlText
1321+
1322+
var pipelineCommand = Spark_Connect_PipelineCommand()
1323+
pipelineCommand.commandType = .defineSqlGraphElements(elements)
1324+
1325+
var command = Spark_Connect_Command()
1326+
command.commandType = .pipelineCommand(pipelineCommand)
1327+
1328+
let responses = try await execute(self.sessionID!, command)
1329+
return responses.contains {
1330+
$0.responseType == .pipelineCommandResult(Spark_Connect_PipelineCommandResult())
1331+
}
1332+
}
1333+
}
1334+
13061335
private enum URIParams {
13071336
static let PARAM_GRPC_MAX_MESSAGE_SIZE = "grpc_max_message_size"
13081337
static let PARAM_SESSION_ID = "session_id"

Tests/SparkConnectTests/SparkConnectClientTests.swift

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,22 @@ struct SparkConnectClientTests {
164164
}
165165
await client.stop()
166166
}
167+
168+
@Test
169+
func defineSqlGraphElements() async throws {
170+
let client = SparkConnectClient(remote: TEST_REMOTE)
171+
let response = try await client.connect(UUID().uuidString)
172+
173+
try await #require(throws: SparkConnectError.InvalidArgument) {
174+
try await client.defineSqlGraphElements("not-a-uuid-format", "path", "sql")
175+
}
176+
177+
if response.sparkVersion.version.starts(with: "4.1") {
178+
let dataflowGraphID = try await client.createDataflowGraph()
179+
let sqlText = "CREATE MATERIALIZED VIEW mv1 AS SELECT 1"
180+
#expect(UUID(uuidString: dataflowGraphID) != nil)
181+
#expect(try await client.defineSqlGraphElements(dataflowGraphID, "path", sqlText))
182+
}
183+
await client.stop()
184+
}
167185
}

0 commit comments

Comments
 (0)