Skip to content

Commit 60fa9c4

Browse files
committed
[SPARK-52376] Support addArtifact(s)? in SparkSession
### What changes were proposed in this pull request? This PR aims to support the following APIs in `SparkSession`. - `addArtifact(_ path: String)` - `addArtifact(_ url: URL)` - `addArtifacts(_ url: URL...)` ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No behavior change. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #193 from dongjoon-hyun/SPARK-52376. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent fe6f6d9 commit 60fa9c4

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,39 @@ public actor SparkConnectClient {
735735
return plan
736736
}
737737

738+
func addArtifact(_ url: URL) async throws {
739+
guard url.lastPathComponent.hasSuffix(".jar") else {
740+
throw SparkConnectError.InvalidArgument
741+
}
742+
743+
let JAR_PREFIX = "jars"
744+
let name = "\(JAR_PREFIX)/" + url.lastPathComponent
745+
746+
try await withGPRC { client in
747+
let service = SparkConnectService.Client(wrapping: client)
748+
749+
var chunk = Spark_Connect_AddArtifactsRequest.ArtifactChunk()
750+
chunk.data = try Data(contentsOf: url)
751+
chunk.crc = Int64(CRC32.checksum(data: chunk.data))
752+
753+
var singleChunk = Spark_Connect_AddArtifactsRequest.SingleChunkArtifact()
754+
singleChunk.name = name
755+
singleChunk.data = chunk
756+
var batch = Spark_Connect_AddArtifactsRequest.Batch()
757+
batch.artifacts.append(singleChunk)
758+
759+
var addArtifactsRequest = Spark_Connect_AddArtifactsRequest()
760+
addArtifactsRequest.sessionID = self.sessionID!
761+
addArtifactsRequest.userContext = self.userContext
762+
addArtifactsRequest.clientType = self.clientType
763+
addArtifactsRequest.batch = batch
764+
let request = addArtifactsRequest
765+
_ = try await service.addArtifacts(request: StreamingClientRequest<Spark_Connect_AddArtifactsRequest> { x in
766+
try await x.write(contentsOf: [request])
767+
})
768+
}
769+
}
770+
738771
/// Add a tag to be assigned to all the operations started by this thread in this session.
739772
/// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string.
740773
public func addTag(tag: String) throws {

Sources/SparkConnect/SparkSession.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,36 @@ public actor SparkSession {
267267
return await read.table(tableName)
268268
}
269269

270+
/// Add a single artifact to the current session.
271+
/// Currently only local files with extensions .jar supported.
272+
/// - Parameter url: A url to the artifact
273+
public func addArtifact(_ url: URL) async throws {
274+
try await self.client.addArtifact(url)
275+
}
276+
277+
/// Add a single artifact to the current session.
278+
/// Currently only local files with extensions .jar are supported.
279+
/// - Parameter path: A path to the file.
280+
public func addArtifact(_ path: String) async throws {
281+
try await self.client.addArtifact(URL(fileURLWithPath: path))
282+
}
283+
284+
/// Add one or more artifacts to the session.
285+
/// - Parameter url: One or more URLs
286+
public func addArtifacts(_ url: URL...) async throws {
287+
for u in url {
288+
try await self.client.addArtifact(u)
289+
}
290+
}
291+
292+
/// Execute an arbitrary string command inside an external execution engine rather than Spark.
293+
/// This could be useful when user wants to execute some commands out of Spark. For example,
294+
/// executing custom DDL/DML command for JDBC, creating index for ElasticSearch, creating cores
295+
/// for Solr and so on.
296+
/// - Parameters:
297+
/// - runner: The class name of the runner that implements `ExternalCommandRunner`.
298+
/// - command: The target command to be executed
299+
/// - options: The options for the runner.
270300
public func executeCommand(_ runner: String, _ command: String, _ options: [String: String])
271301
async throws -> DataFrame
272302
{

Tests/SparkConnectTests/SparkSessionTests.swift

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,49 @@ struct SparkSessionTests {
142142
await spark.stop()
143143
}
144144

145+
@Test
146+
func addInvalidArtifact() async throws {
147+
await SparkSession.builder.clear()
148+
let spark = try await SparkSession.builder.getOrCreate()
149+
await #expect(throws: SparkConnectError.InvalidArgument) {
150+
try await spark.addArtifact("x.txt")
151+
}
152+
await spark.stop()
153+
}
154+
155+
@Test
156+
func addArtifact() async throws {
157+
let fm = FileManager()
158+
let path = "my.jar"
159+
let url = URL(fileURLWithPath: path)
160+
161+
await SparkSession.builder.clear()
162+
let spark = try await SparkSession.builder.getOrCreate()
163+
#expect(fm.createFile(atPath: path, contents: "abc".data(using: .utf8)))
164+
if await spark.version.starts(with: "4.") {
165+
try await spark.addArtifact(path)
166+
try await spark.addArtifact(url)
167+
}
168+
try fm.removeItem(atPath: path)
169+
await spark.stop()
170+
}
171+
172+
@Test
173+
func addArtifacts() async throws {
174+
let fm = FileManager()
175+
let path = "my.jar"
176+
let url = URL(fileURLWithPath: path)
177+
178+
await SparkSession.builder.clear()
179+
let spark = try await SparkSession.builder.getOrCreate()
180+
#expect(fm.createFile(atPath: path, contents: "abc".data(using: .utf8)))
181+
if await spark.version.starts(with: "4.") {
182+
try await spark.addArtifacts(url, url)
183+
}
184+
try fm.removeItem(atPath: path)
185+
await spark.stop()
186+
}
187+
145188
@Test
146189
func executeCommand() async throws {
147190
await SparkSession.builder.clear()

0 commit comments

Comments
 (0)