Skip to content

Commit dc6d0e8

Browse files
committed
[SPARK-52361] Support executeCommand in SparkSession
### What changes were proposed in this pull request? This PR aims to support `executeCommand` API of `SparkSession`. In addition, `DataSourceNotFound` is added to `SparkConnectError`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. Since `Apache Spark` distribution has no built-in `ExternalCommandRunner`. I crafted the following for manual testing. In this PR, the added test case only checks the API invocation part. ```bash $ cat MyCommand.java public class MyCommand implements org.apache.spark.sql.connector.ExternalCommandRunner { Override public String[] executeCommand(String command, org.apache.spark.sql.util.CaseInsensitiveStringMap options) { String[] result = {"Hello", command}; return result; } } ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #187 from dongjoon-hyun/SPARK-52361. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 30bd8c4 commit dc6d0e8

File tree

4 files changed

+40
-0
lines changed

4 files changed

+40
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ public actor SparkConnectClient {
141141
throw SparkConnectError.TableOrViewNotFound
142142
case let m where m.contains("Invalid view name:"):
143143
throw SparkConnectError.InvalidViewName
144+
case let m where m.contains("DATA_SOURCE_NOT_FOUND"):
145+
throw SparkConnectError.DataSourceNotFound
144146
default:
145147
throw error
146148
}
@@ -715,6 +717,24 @@ public actor SparkConnectClient {
715717
return result
716718
}
717719

720+
func getExecuteExternalCommand(
721+
_ runner: String,
722+
_ command: String,
723+
_ options: [String: String]
724+
) async throws -> Plan {
725+
var executeExternalCommand = Spark_Connect_ExecuteExternalCommand()
726+
executeExternalCommand.runner = runner
727+
executeExternalCommand.command = command
728+
executeExternalCommand.options = options
729+
var command = Command()
730+
command.commandType = .executeExternalCommand(executeExternalCommand)
731+
let response = try await execute(self.sessionID!, command)
732+
let relation = response.first!.sqlCommandResult.relation
733+
var plan = Plan()
734+
plan.opType = .root(relation)
735+
return plan
736+
}
737+
718738
/// Add a tag to be assigned to all the operations started by this thread in this session.
719739
/// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string.
720740
public func addTag(tag: String) throws {

Sources/SparkConnect/SparkConnectError.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
public enum SparkConnectError: Error {
2222
case CatalogNotFound
2323
case ColumnNotFound
24+
case DataSourceNotFound
2425
case InvalidArgument
2526
case InvalidSessionID
2627
case InvalidType

Sources/SparkConnect/SparkSession.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,13 @@ public actor SparkSession {
267267
return await read.table(tableName)
268268
}
269269

270+
public func executeCommand(_ runner: String, _ command: String, _ options: [String: String])
271+
async throws -> DataFrame
272+
{
273+
let plan = try await self.client.getExecuteExternalCommand(runner, command, options)
274+
return DataFrame(spark: self, plan: plan)
275+
}
276+
270277
/// Executes a code block and prints the execution time.
271278
///
272279
/// This utility method is useful for performance testing and optimization.

Tests/SparkConnectTests/SparkSessionTests.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,18 @@ struct SparkSessionTests {
141141
}
142142
await spark.stop()
143143
}
144+
145+
@Test
146+
func executeCommand() async throws {
147+
await SparkSession.builder.clear()
148+
let spark = try await SparkSession.builder.getOrCreate()
149+
if await spark.version.starts(with: "4.") {
150+
await #expect(throws: SparkConnectError.DataSourceNotFound) {
151+
try await spark.executeCommand("runner", "command", [:]).show()
152+
}
153+
}
154+
await spark.stop()
155+
}
144156
#endif
145157

146158
@Test

0 commit comments

Comments
 (0)