Skip to content

Commit 4c07868

Browse files
committed
Initial Implementation
1 parent 18584e8 commit 4c07868

File tree

7 files changed

+886
-0
lines changed

7 files changed

+886
-0
lines changed

README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Apache Spark Connect Client for Swift
22

33
[![GitHub Actions Build](https://github.com/apache/spark-connect-swift/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/apache/spark-connect-swift/blob/main/.github/workflows/build_and_test.yml)
4+
[![Swift Version Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dswift-versions)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift)
5+
[![Platform Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dplatforms)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift)
46

57
This is an experimental Swift library to show how to connect to a remote Apache Spark Connect Server and run SQL statements to manipulate remote data.
68

@@ -13,3 +15,96 @@ So far, this library project is tracking the upstream changes like the [Apache S
1315
- [gRPC Swift Protobuf 1.0 (March 2025)](https://github.com/grpc/grpc-swift-protobuf/releases/tag/1.1.0)
1416
- [gRPC Swift NIO Transport 1.0 (March 2025)](https://github.com/grpc/grpc-swift-nio-transport/releases/tag/1.0.1)
1517
- [Apache Arrow Swift](https://github.com/apache/arrow/tree/main/swift)
18+
19+
## Run `Apache Spark 4.0.0 RC2 Connect Server`
20+
21+
$ curl -LO https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc2-bin/spark-4.0.0-bin-hadoop3.tgz
22+
$ tar xvfz spark-4.0.0-bin-hadoop3.tgz
23+
$ cd spark-4.0.0-bin-hadoop3
24+
$ sbin/start-connect-server.sh
25+
26+
## Run tests
27+
28+
```
29+
$ cd spark-connect-swift
30+
$ swift test
31+
```
32+
33+
## How to use in your apps
34+
35+
Create a Swift project.
36+
```
37+
$ mkdir SparkConnectSwiftApp
38+
$ cd SparkConnectSwiftApp
39+
$ swift package init --name SparkConnectSwiftApp --type executable
40+
```
41+
42+
Add `SparkConnect` package to the dependency like the following
43+
```
44+
$ cat Package.swift
45+
import PackageDescription
46+
47+
let package = Package(
48+
name: "SparkConnectSwiftApp",
49+
platforms: [
50+
.macOS(.v15)
51+
],
52+
dependencies: [
53+
.package(url: "https://github.com/dongjoon-hyun/spark-connect-swift.git", branch: "main")
54+
],
55+
targets: [
56+
.executableTarget(
57+
name: "SparkConnectSwiftApp",
58+
dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")]
59+
)
60+
]
61+
)
62+
```
63+
64+
Use `SparkSession` of `SparkConnect` module in Swift.
65+
66+
```
67+
$ cat Sources/main.swift
68+
69+
import SparkConnect
70+
71+
let spark = try await SparkSession.builder.getOrCreate()
72+
print("Connected to Apache Spark \(await spark.version) Server")
73+
74+
let statements = [
75+
"DROP TABLE IF EXISTS t",
76+
"CREATE TABLE IF NOT EXISTS t(a INT)",
77+
"INSERT INTO t VALUES (1), (2), (3)",
78+
]
79+
80+
for s in statements {
81+
print("EXECUTE: \(s)")
82+
_ = try await spark.sql(s).count()
83+
}
84+
print("SELECT * FROM t")
85+
try await spark.sql("SELECT * FROM t").show()
86+
87+
await spark.stop()
88+
```
89+
90+
Run your Swift application.
91+
92+
```
93+
$ swift run
94+
...
95+
Connected to Apache Spark 4.0.0 Server
96+
EXECUTE: DROP TABLE IF EXISTS t
97+
EXECUTE: CREATE TABLE IF NOT EXISTS t(a INT)
98+
EXECUTE: INSERT INTO t VALUES (1), (2), (3)
99+
SELECT * FROM t
100+
+---+
101+
| a |
102+
+---+
103+
| 2 |
104+
| 1 |
105+
| 3 |
106+
+---+
107+
```
108+
109+
You can find this example in the following repository.
110+
- https://github.com/dongjoon-hyun/spark-connect-swift-app

Sources/SparkConnect/Client.swift

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
19+
import Foundation
20+
import GRPCCore
21+
import GRPCNIOTransportHTTP2
22+
import GRPCProtobuf
23+
import Synchronization
24+
25+
typealias AnalyzePlanRequest = Spark_Connect_AnalyzePlanRequest
26+
typealias AnalyzePlanResponse = Spark_Connect_AnalyzePlanResponse
27+
typealias ConfigRequest = Spark_Connect_ConfigRequest
28+
typealias DataType = Spark_Connect_DataType
29+
typealias ExecutePlanRequest = Spark_Connect_ExecutePlanRequest
30+
typealias Plan = Spark_Connect_Plan
31+
typealias KeyValue = Spark_Connect_KeyValue
32+
typealias Range = Spark_Connect_Range
33+
typealias Relation = Spark_Connect_Relation
34+
typealias SparkConnectService = Spark_Connect_SparkConnectService
35+
typealias UserContext = Spark_Connect_UserContext
36+
37+
/// Conceptually the remote spark session that communicates with the server
38+
public actor Client {
39+
let clientType: String = "swift"
40+
let url: URL
41+
let host: String
42+
let port: Int
43+
let userContext: UserContext
44+
var sessionID: String? = nil
45+
46+
/// Create a client to use GRPCClient.
47+
/// - Parameters:
48+
/// - remote: A string to connect `Spark Connect` server.
49+
/// - user: A string for the user ID of this connection.
50+
init(remote: String, user: String) {
51+
self.url = URL(string: remote)!
52+
self.host = url.host() ?? "localhost"
53+
self.port = self.url.port ?? 15002
54+
self.userContext = user.toUserContext
55+
}
56+
57+
/// Stop the connection. Currently, this API is no-op because we don't reuse the connection.
58+
func stop() {
59+
}
60+
61+
private var service: SparkConnectService.Client<HTTP2ClientTransport.Posix>? = nil
62+
63+
/// Connect to the `Spark Connect` server with the given session ID string.
64+
/// As a test connection, this sends the server `SparkVersion` request.
65+
/// - Parameter sessionID: A string for the session ID.
66+
/// - Returns: An `AnalyzePlanResponse` instance for `SparkVersion`
67+
func connect(_ sessionID: String) async throws -> AnalyzePlanResponse {
68+
try await withGRPCClient(
69+
transport: .http2NIOPosix(
70+
target: .dns(host: self.host, port: self.port),
71+
transportSecurity: .plaintext
72+
)
73+
) { client in
74+
self.sessionID = sessionID
75+
let service = SparkConnectService.Client(wrapping: client)
76+
let version = AnalyzePlanRequest.SparkVersion()
77+
var request = AnalyzePlanRequest()
78+
request.clientType = clientType
79+
request.userContext = userContext
80+
request.sessionID = sessionID
81+
request.analyze = .sparkVersion(version)
82+
let response = try await service.analyzePlan(request)
83+
return response
84+
}
85+
}
86+
87+
/// Create a ``ConfigRequest`` instance for `Set` operation.
88+
/// - Parameter map: A map of key-value string pairs.
89+
/// - Returns: A ``ConfigRequest`` instance.
90+
func getConfigRequestSet(map: [String: String]) -> ConfigRequest {
91+
var request = ConfigRequest()
92+
request.operation = ConfigRequest.Operation()
93+
var set = ConfigRequest.Set()
94+
set.pairs = map.toSparkConnectKeyValue
95+
request.operation.opType = .set(set)
96+
return request
97+
}
98+
99+
/// Request the server to set a map of configurations for this session.
100+
/// - Parameter map: A map of key-value pairs to set.
101+
/// - Returns: Always return true.
102+
func setConf(map: [String: String]) async throws -> Bool {
103+
try await withGRPCClient(
104+
transport: .http2NIOPosix(
105+
target: .dns(host: self.host, port: self.port),
106+
transportSecurity: .plaintext
107+
)
108+
) { client in
109+
let service = SparkConnectService.Client(wrapping: client)
110+
var request = getConfigRequestSet(map: map)
111+
request.clientType = clientType
112+
request.userContext = userContext
113+
request.sessionID = self.sessionID!
114+
let _ = try await service.config(request)
115+
return true
116+
}
117+
}
118+
119+
/// Create a ``ConfigRequest`` instance for `Get` operation.
120+
/// - Parameter keys: An array of keys to get.
121+
/// - Returns: A `ConfigRequest` instance.
122+
func getConfigRequestGet(keys: [String]) -> ConfigRequest {
123+
var request = ConfigRequest()
124+
request.operation = ConfigRequest.Operation()
125+
var get = ConfigRequest.Get()
126+
get.keys = keys
127+
request.operation.opType = .get(get)
128+
return request
129+
}
130+
131+
/// Request the server to get a value of the given key.
132+
/// - Parameter key: A string for key to look up.
133+
/// - Returns: A string for the value of the key.
134+
func getConf(_ key: String) async throws -> String {
135+
try await withGRPCClient(
136+
transport: .http2NIOPosix(
137+
target: .dns(host: self.host, port: self.port),
138+
transportSecurity: .plaintext
139+
)
140+
) { client in
141+
let service = SparkConnectService.Client(wrapping: client)
142+
var request = getConfigRequestGet(keys: [key])
143+
request.clientType = clientType
144+
request.userContext = userContext
145+
request.sessionID = self.sessionID!
146+
let response = try await service.config(request)
147+
return response.pairs[0].value
148+
}
149+
}
150+
151+
/// Create a ``ConfigRequest`` for `GetAll` operation.
152+
/// - Returns: <#description#>
153+
func getConfigRequestGetAll() -> ConfigRequest {
154+
var request = ConfigRequest()
155+
request.operation = ConfigRequest.Operation()
156+
let getAll = ConfigRequest.GetAll()
157+
request.operation.opType = .getAll(getAll)
158+
return request
159+
}
160+
161+
/// Request the server to get all configurations.
162+
/// - Returns: A map of key-value pairs.
163+
func getConfAll() async throws -> [String: String] {
164+
try await withGRPCClient(
165+
transport: .http2NIOPosix(
166+
target: .dns(host: self.host, port: self.port),
167+
transportSecurity: .plaintext
168+
)
169+
) { client in
170+
let service = SparkConnectService.Client(wrapping: client)
171+
var request = getConfigRequestGetAll()
172+
request.clientType = clientType
173+
request.userContext = userContext
174+
request.sessionID = self.sessionID!
175+
let response = try await service.config(request)
176+
var map = [String: String]()
177+
for pair in response.pairs {
178+
map[pair.key] = pair.value
179+
}
180+
return map
181+
}
182+
}
183+
184+
/// Create a `Plan` instance for `Range` relation.
185+
/// - Parameters:
186+
/// - start: A start of the range.
187+
/// - end: A end (exclusive) of the range.
188+
/// - step: A step value for the range from `start` to `end`.
189+
/// - Returns: <#description#>
190+
func getPlanRange(_ start: Int64, _ end: Int64, _ step: Int64) -> Plan {
191+
var range = Range()
192+
range.start = start
193+
range.end = end
194+
range.step = step
195+
var relation = Relation()
196+
relation.range = range
197+
var plan = Plan()
198+
plan.opType = .root(relation)
199+
return plan
200+
}
201+
202+
/// Create a ``ExecutePlanRequest`` instance with the given session ID and plan.
203+
/// The operation ID is created by UUID.
204+
/// - Parameters:
205+
/// - sessionID: A string for the existing session ID.
206+
/// - plan: A plan to execute.
207+
/// - Returns: An ``ExecutePlanRequest`` instance.
208+
func getExecutePlanRequest(_ sessionID: String, _ plan: Plan) async
209+
-> ExecutePlanRequest
210+
{
211+
var request = ExecutePlanRequest()
212+
request.clientType = clientType
213+
request.userContext = userContext
214+
request.sessionID = sessionID
215+
request.operationID = UUID().uuidString
216+
request.plan = plan
217+
return request
218+
}
219+
220+
/// Create a ``AnalyzePlanRequest`` instance with the given session ID and plan.
221+
/// - Parameters:
222+
/// - sessionID: A string for the existing session ID.
223+
/// - plan: A plan to analyze.
224+
/// - Returns: An ``AnalyzePlanRequest`` instance
225+
func getAnalyzePlanRequest(_ sessionID: String, _ plan: Plan) async
226+
-> AnalyzePlanRequest
227+
{
228+
var request = AnalyzePlanRequest()
229+
request.clientType = clientType
230+
request.userContext = userContext
231+
request.sessionID = sessionID
232+
var schema = AnalyzePlanRequest.Schema()
233+
schema.plan = plan
234+
request.analyze = .schema(schema)
235+
return request
236+
}
237+
}

0 commit comments

Comments
 (0)