File tree Expand file tree Collapse file tree 2 files changed +47
-0
lines changed Expand file tree Collapse file tree 2 files changed +47
-0
lines changed Original file line number Diff line number Diff line change @@ -1303,6 +1303,35 @@ public actor SparkConnectClient {
1303
1303
}
1304
1304
}
1305
1305
1306
+ @discardableResult
1307
+ func defineSqlGraphElements(
1308
+ _ dataflowGraphID: String ,
1309
+ _ sqlFilePath: String ,
1310
+ _ sqlText: String
1311
+ ) async throws -> Bool {
1312
+ try await withGPRC { client in
1313
+ if UUID ( uuidString: dataflowGraphID) == nil {
1314
+ throw SparkConnectError . InvalidArgument
1315
+ }
1316
+
1317
+ var elements = Spark_Connect_PipelineCommand . DefineSqlGraphElements ( )
1318
+ elements. dataflowGraphID = dataflowGraphID
1319
+ elements. sqlFilePath = sqlFilePath
1320
+ elements. sqlText = sqlText
1321
+
1322
+ var pipelineCommand = Spark_Connect_PipelineCommand ( )
1323
+ pipelineCommand. commandType = . defineSqlGraphElements( elements)
1324
+
1325
+ var command = Spark_Connect_Command ( )
1326
+ command. commandType = . pipelineCommand( pipelineCommand)
1327
+
1328
+ let responses = try await execute ( self . sessionID!, command)
1329
+ return responses. contains {
1330
+ $0. responseType == . pipelineCommandResult( Spark_Connect_PipelineCommandResult ( ) )
1331
+ }
1332
+ }
1333
+ }
1334
+
1306
1335
private enum URIParams {
1307
1336
static let PARAM_GRPC_MAX_MESSAGE_SIZE = " grpc_max_message_size "
1308
1337
static let PARAM_SESSION_ID = " session_id "
Original file line number Diff line number Diff line change @@ -164,4 +164,22 @@ struct SparkConnectClientTests {
164
164
}
165
165
await client. stop ( )
166
166
}
167
+
168
+ @Test
169
+ func defineSqlGraphElements( ) async throws {
170
+ let client = SparkConnectClient ( remote: TEST_REMOTE)
171
+ let response = try await client. connect ( UUID ( ) . uuidString)
172
+
173
+ try await #require( throws: SparkConnectError . InvalidArgument) {
174
+ try await client. defineSqlGraphElements ( " not-a-uuid-format " , " path " , " sql " )
175
+ }
176
+
177
+ if response. sparkVersion. version. starts ( with: " 4.1 " ) {
178
+ let dataflowGraphID = try await client. createDataflowGraph ( )
179
+ let sqlText = " CREATE MATERIALIZED VIEW mv1 AS SELECT 1 "
180
+ #expect( UUID ( uuidString: dataflowGraphID) != nil )
181
+ #expect( try await client. defineSqlGraphElements ( dataflowGraphID, " path " , sqlText) )
182
+ }
183
+ await client. stop ( )
184
+ }
167
185
}
You can’t perform that action at this time.
0 commit comments