Skip to content

Commit 6254c8b

Browse files
committed
[SPARK-52376] Support addArtifact(s)? in SparkSession
1 parent fe6f6d9 commit 6254c8b

File tree

3 files changed

+102
-0
lines changed

3 files changed

+102
-0
lines changed

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -735,6 +735,39 @@ public actor SparkConnectClient {
735735
return plan
736736
}
737737

738+
func addArtifact(_ url: URL) async throws {
739+
guard url.lastPathComponent.hasSuffix(".jar") else {
740+
throw SparkConnectError.InvalidArgument
741+
}
742+
743+
let JAR_PREFIX = "jars"
744+
let name = "\(JAR_PREFIX)/" + url.lastPathComponent
745+
746+
try await withGPRC { client in
747+
let service = SparkConnectService.Client(wrapping: client)
748+
749+
var chunk = Spark_Connect_AddArtifactsRequest.ArtifactChunk()
750+
chunk.data = try Data(contentsOf: url)
751+
chunk.crc = Int64(CRC32.checksum(data: chunk.data))
752+
753+
var singleChunk = Spark_Connect_AddArtifactsRequest.SingleChunkArtifact()
754+
singleChunk.name = name
755+
singleChunk.data = chunk
756+
var batch = Spark_Connect_AddArtifactsRequest.Batch()
757+
batch.artifacts.append(singleChunk)
758+
759+
var addArtifactsRequest = Spark_Connect_AddArtifactsRequest()
760+
addArtifactsRequest.sessionID = self.sessionID!
761+
addArtifactsRequest.userContext = self.userContext
762+
addArtifactsRequest.clientType = self.clientType
763+
addArtifactsRequest.batch = batch
764+
let request = addArtifactsRequest
765+
_ = try await service.addArtifacts(request: StreamingClientRequest<Spark_Connect_AddArtifactsRequest> { x in
766+
try await x.write(contentsOf: [request])
767+
})
768+
}
769+
}
770+
738771
/// Add a tag to be assigned to all the operations started by this thread in this session.
739772
/// - Parameter tag: The tag to be added. Cannot contain ',' (comma) character or be an empty string.
740773
public func addTag(tag: String) throws {

Sources/SparkConnect/SparkSession.swift

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,36 @@ public actor SparkSession {
267267
return await read.table(tableName)
268268
}
269269

270+
/// Add a single artifact to the current session.
271+
/// Currently only local files with extensions .jar supported.
272+
/// - Parameter url: A url to the artifact
273+
public func addArtifact(_ url: URL) async throws {
274+
try await self.client.addArtifact(url)
275+
}
276+
277+
/// Add a single artifact to the current session.
278+
/// Currently only local files with extensions .jar are supported.
279+
/// - Parameter path: A path to the file.
280+
public func addArtifact(_ path: String) async throws {
281+
try await self.client.addArtifact(URL(fileURLWithPath: path))
282+
}
283+
284+
/// Add one or more artifacts to the session.
285+
/// - Parameter url: One or more URLs
286+
public func addArtifacts(_ url: URL...) async throws {
287+
for u in url {
288+
try await self.client.addArtifact(u)
289+
}
290+
}
291+
292+
/// Execute an arbitrary string command inside an external execution engine rather than Spark.
293+
/// This could be useful when user wants to execute some commands out of Spark. For example,
294+
/// executing custom DDL/DML command for JDBC, creating index for ElasticSearch, creating cores
295+
/// for Solr and so on.
296+
/// - Parameters:
297+
/// - runner: The class name of the runner that implements `ExternalCommandRunner`.
298+
/// - command: The target command to be executed
299+
/// - options: The options for the runner.
270300
public func executeCommand(_ runner: String, _ command: String, _ options: [String: String])
271301
async throws -> DataFrame
272302
{

Tests/SparkConnectTests/SparkSessionTests.swift

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,45 @@ struct SparkSessionTests {
142142
await spark.stop()
143143
}
144144

145+
@Test
146+
func addInvalidArtifact() async throws {
147+
await SparkSession.builder.clear()
148+
let spark = try await SparkSession.builder.getOrCreate()
149+
await #expect(throws: SparkConnectError.InvalidArgument) {
150+
try await spark.addArtifact("x.txt")
151+
}
152+
await spark.stop()
153+
}
154+
155+
@Test
156+
func addArtifact() async throws {
157+
let fm = FileManager()
158+
let path = "my.jar"
159+
let url = URL(fileURLWithPath: path)
160+
161+
await SparkSession.builder.clear()
162+
let spark = try await SparkSession.builder.getOrCreate()
163+
#expect(fm.createFile(atPath: path, contents: "abc".data(using: .utf8)))
164+
try await spark.addArtifact(path)
165+
try await spark.addArtifact(url)
166+
try fm.removeItem(atPath: path)
167+
await spark.stop()
168+
}
169+
170+
@Test
171+
func addArtifacts() async throws {
172+
let fm = FileManager()
173+
let path = "/Users/dongjoon/APACHE/spark-4.0.0-bin-hadoop3/MyCommand.jar"
174+
let url = URL(fileURLWithPath: path)
175+
176+
await SparkSession.builder.clear()
177+
let spark = try await SparkSession.builder.getOrCreate()
178+
#expect(fm.createFile(atPath: path, contents: "abc".data(using: .utf8)))
179+
try await spark.addArtifact(url)
180+
try fm.removeItem(atPath: path)
181+
await spark.stop()
182+
}
183+
145184
@Test
146185
func executeCommand() async throws {
147186
await SparkSession.builder.clear()

0 commit comments

Comments
 (0)