Skip to content

Commit b5d4f64

Browse files
committed
Initial Implementation
1 parent 18584e8 commit b5d4f64

File tree

6 files changed

+649
-0
lines changed

6 files changed

+649
-0
lines changed

README.md

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Apache Spark Connect Client for Swift
22

33
[![GitHub Actions Build](https://github.com/apache/spark-connect-swift/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/apache/spark-connect-swift/blob/main/.github/workflows/build_and_test.yml)
4+
[![Swift Version Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dswift-versions)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift)
5+
[![Platform Compatibility](https://img.shields.io/endpoint?url=https%3A%2F%2Fswiftpackageindex.com%2Fapi%2Fpackages%2Fdongjoon-hyun%2Fspark-connect-swift%2Fbadge%3Ftype%3Dplatforms)](https://swiftpackageindex.com/dongjoon-hyun/spark-connect-swift)
46

57
This is an experimental Swift library to show how to connect to a remote Apache Spark Connect Server and run SQL statements to manipulate remote data.
68

@@ -13,3 +15,96 @@ So far, this library project is tracking the upstream changes like the [Apache S
1315
- [gRPC Swift Protobuf 1.0 (March 2025)](https://github.com/grpc/grpc-swift-protobuf/releases/tag/1.1.0)
1416
- [gRPC Swift NIO Transport 1.0 (March 2025)](https://github.com/grpc/grpc-swift-nio-transport/releases/tag/1.0.1)
1517
- [Apache Arrow Swift](https://github.com/apache/arrow/tree/main/swift)
18+
19+
## Run `Apache Spark 4.0.0 RC2 Connect Server`
20+
21+
$ curl -LO https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc2-bin/spark-4.0.0-bin-hadoop3.tgz
22+
$ tar xvfz spark-4.0.0-bin-hadoop3.tgz
23+
$ cd spark-4.0.0-bin-hadoop3
24+
$ sbin/start-connect-server.sh
25+
26+
## Run tests
27+
28+
```
29+
$ cd spark-connect-swift
30+
$ swift test
31+
```
32+
33+
## How to use in your apps
34+
35+
Create a Swift project.
36+
```
37+
$ mkdir SparkConnectSwiftApp
38+
$ cd SparkConnectSwiftApp
39+
$ swift package init --name SparkConnectSwiftApp --type executable
40+
```
41+
42+
Add `SparkConnect` package to the dependency like the following
43+
```
44+
$ cat Package.swift
45+
import PackageDescription
46+
47+
let package = Package(
48+
name: "SparkConnectSwiftApp",
49+
platforms: [
50+
.macOS(.v15)
51+
],
52+
dependencies: [
53+
.package(url: "https://github.com/dongjoon-hyun/spark-connect-swift.git", branch: "main")
54+
],
55+
targets: [
56+
.executableTarget(
57+
name: "SparkConnectSwiftApp",
58+
dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")]
59+
)
60+
]
61+
)
62+
```
63+
64+
Use `SparkSession` of `SparkConnect` module in Swift.
65+
66+
```
67+
$ cat Sources/main.swift
68+
69+
import SparkConnect
70+
71+
let spark = try await SparkSession.builder.getOrCreate()
72+
print("Connected to Apache Spark \(await spark.version) Server")
73+
74+
let statements = [
75+
"DROP TABLE IF EXISTS t",
76+
"CREATE TABLE IF NOT EXISTS t(a INT)",
77+
"INSERT INTO t VALUES (1), (2), (3)",
78+
]
79+
80+
for s in statements {
81+
print("EXECUTE: \(s)")
82+
_ = try await spark.sql(s).count()
83+
}
84+
print("SELECT * FROM t")
85+
try await spark.sql("SELECT * FROM t").show()
86+
87+
await spark.stop()
88+
```
89+
90+
Run your Swift application.
91+
92+
```
93+
$ swift run
94+
...
95+
Connected to Apache Spark 4.0.0 Server
96+
EXECUTE: DROP TABLE IF EXISTS t
97+
EXECUTE: CREATE TABLE IF NOT EXISTS t(a INT)
98+
EXECUTE: INSERT INTO t VALUES (1), (2), (3)
99+
SELECT * FROM t
100+
+---+
101+
| a |
102+
+---+
103+
| 2 |
104+
| 1 |
105+
| 3 |
106+
+---+
107+
```
108+
109+
You can find this example in the following repository.
110+
- https://github.com/dongjoon-hyun/spark-connect-swift-app
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
import Atomics
20+
import Foundation
21+
import GRPCCore
22+
import GRPCNIOTransportHTTP2
23+
import GRPCProtobuf
24+
import NIOCore
25+
import SwiftyTextTable
26+
import Synchronization
27+
28+
/// A DataFrame which supports only SQL queries
29+
public actor DataFrame: Sendable {
30+
var spark: SparkSession
31+
var plan: Plan
32+
var schema: DataType? = nil
33+
private var batches: [RecordBatch] = [RecordBatch]()
34+
35+
/// Create a new `DataFrame`instance with the given Spark session and plan.
36+
/// - Parameters:
37+
/// - spark: A ``SparkSession`` instance to use.
38+
/// - plan: A plan to execute.
39+
init(spark: SparkSession, plan: Plan) async throws {
40+
self.spark = spark
41+
self.plan = plan
42+
}
43+
44+
/// Create a new `DataFrame` instance with the given SparkSession and a SQL statement.
45+
/// - Parameters:
46+
/// - spark: A `SparkSession` instance to use.
47+
/// - sqlText: A SQL statement.
48+
init(spark: SparkSession, sqlText: String) async throws {
49+
self.spark = spark
50+
self.plan = sqlText.toSparkConnectPlan
51+
}
52+
53+
/// Set the schema. This is used to store the analized schema response from `Spark Connect` server.
54+
/// - Parameter schema: <#schema description#>
55+
private func setSchema(_ schema: DataType) {
56+
self.schema = schema
57+
}
58+
59+
/// Add `Apache Arrow`'s `RecordBatch`s to the intenal array.
60+
/// - Parameter batches: A ``RecordBatch`` instance.
61+
private func addBathes(_ batches: [RecordBatch]) {
62+
self.batches.append(contentsOf: batches)
63+
}
64+
65+
/// A method to access the underlying Spark's `RDD`.
66+
/// In `Spark Connect`, this feature is not allowed by design.
67+
public func rdd() throws {
68+
// SQLSTATE: 0A000
69+
// [UNSUPPORTED_CONNECT_FEATURE.RDD]
70+
// Feature is not supported in Spark Connect: Resilient Distributed Datasets (RDDs).
71+
throw SparkConnectError.UnsupportedOperationException
72+
}
73+
74+
/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
75+
/// - Returns: a `JSON` string.
76+
public func schema() async throws -> String {
77+
var dataType: String? = nil
78+
79+
try await withGRPCClient(
80+
transport: .http2NIOPosix(
81+
target: .dns(host: spark.client.host, port: spark.client.port),
82+
transportSecurity: .plaintext
83+
)
84+
) { client in
85+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
86+
let response = try await service.analyzePlan(
87+
spark.client.getAnalyzePlanRequest(spark.sessionID, plan))
88+
dataType = try response.schema.schema.jsonString()
89+
}
90+
return dataType!
91+
}
92+
93+
/// Return the total number of rows.
94+
/// - Returns: a `Int64` value.
95+
public func count() async throws -> Int64 {
96+
let counter = Atomic(Int64(0))
97+
98+
try await withGRPCClient(
99+
transport: .http2NIOPosix(
100+
target: .dns(host: spark.client.host, port: spark.client.port),
101+
transportSecurity: .plaintext
102+
)
103+
) { client in
104+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
105+
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) {
106+
response in
107+
for try await m in response.messages {
108+
counter.add(m.arrowBatch.rowCount, ordering: .relaxed)
109+
}
110+
}
111+
}
112+
return counter.load(ordering: .relaxed)
113+
}
114+
115+
/// Execute the plan and try to fill `schema` and `batches`.
116+
private func execute() async throws {
117+
try await withGRPCClient(
118+
transport: .http2NIOPosix(
119+
target: .dns(host: spark.client.host, port: spark.client.port),
120+
transportSecurity: .plaintext
121+
)
122+
) { client in
123+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
124+
try await service.executePlan(spark.client.getExecutePlanRequest(spark.sessionID, plan)) {
125+
response in
126+
for try await m in response.messages {
127+
if m.hasSchema {
128+
// The original schema should arrive before ArrowBatches
129+
await self.setSchema(m.schema)
130+
}
131+
let ipcStreamBytes = m.arrowBatch.data
132+
if !ipcStreamBytes.isEmpty && m.arrowBatch.rowCount > 0 {
133+
let IPC_CONTINUATION_TOKEN = Int32(-1)
134+
// Schema
135+
assert(ipcStreamBytes[0..<4].int32 == IPC_CONTINUATION_TOKEN)
136+
let schemaSize = Int64(ipcStreamBytes[4..<8].int32)
137+
let schema = Data(ipcStreamBytes[8..<(8 + schemaSize)])
138+
139+
// Arrow IPC Data
140+
assert(
141+
ipcStreamBytes[(8 + schemaSize)..<(8 + schemaSize + 4)].int32
142+
== IPC_CONTINUATION_TOKEN)
143+
var pos: Int64 = 8 + schemaSize + 4
144+
let dataHeaderSize = Int64(ipcStreamBytes[pos..<(pos + 4)].int32)
145+
pos += 4
146+
let dataHeader = Data(ipcStreamBytes[pos..<(pos + dataHeaderSize)])
147+
pos += dataHeaderSize
148+
let dataBodySize = Int64(ipcStreamBytes.count) - pos - 8
149+
let dataBody = Data(ipcStreamBytes[pos..<(pos + dataBodySize)])
150+
151+
// Read ArrowBatches
152+
let reader = ArrowReader()
153+
let arrowResult = ArrowReader.makeArrowReaderResult()
154+
_ = reader.fromMessage(schema, dataBody: Data(), result: arrowResult)
155+
_ = reader.fromMessage(dataHeader, dataBody: dataBody, result: arrowResult)
156+
await self.addBathes(arrowResult.batches)
157+
}
158+
}
159+
}
160+
}
161+
}
162+
163+
/// This is designed not to support this feature in order to simplify the Swift client.
164+
public func collect() async throws {
165+
throw SparkConnectError.UnsupportedOperationException
166+
}
167+
168+
/// Execute the plan and show the result.
169+
public func show() async throws {
170+
try await execute()
171+
172+
if let schema = self.schema {
173+
var columns: [TextTableColumn] = []
174+
for f in schema.struct.fields {
175+
columns.append(TextTableColumn(header: f.name))
176+
}
177+
var table = TextTable(columns: columns)
178+
for batch in self.batches {
179+
for i in 0..<batch.length {
180+
var values: [String] = []
181+
for column in batch.columns {
182+
let str = column.array as! AsString
183+
if column.data.isNull(i) {
184+
values.append("NULL")
185+
} else {
186+
values.append(str.asString(i))
187+
}
188+
}
189+
table.addRow(values: values)
190+
}
191+
}
192+
print(table.render())
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)