Skip to content

Commit ce65f4a

Browse files
committed
[SPARK-52071] Add stream example and revise Trigger names
### What changes were proposed in this pull request? This PR aims to add `stream` example and revise `Trigger` names more consistently with the Scala version. ### Why are the changes needed? To give an illustrative example in `Swift` by porting `StructuredNetworkWordCount.scala`. - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala ### Does this PR introduce _any_ user-facing change? No. This is a change on the unreleased feature. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#128 from dongjoon-hyun/SPARK-52071. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c5f1ff7 commit ce65f4a

File tree

6 files changed

+169
-16
lines changed

6 files changed

+169
-16
lines changed

Examples/stream/Package.swift

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// swift-tools-version: 6.0
2+
//
3+
// Licensed to the Apache Software Foundation (ASF) under one
4+
// or more contributor license agreements. See the NOTICE file
5+
// distributed with this work for additional information
6+
// regarding copyright ownership. The ASF licenses this file
7+
// to you under the Apache License, Version 2.0 (the
8+
// "License"); you may not use this file except in compliance
9+
// with the License. You may obtain a copy of the License at
10+
//
11+
// http://www.apache.org/licenses/LICENSE-2.0
12+
//
13+
// Unless required by applicable law or agreed to in writing,
14+
// software distributed under the License is distributed on an
15+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
// KIND, either express or implied. See the License for the
17+
// specific language governing permissions and limitations
18+
// under the License.
19+
//
20+
21+
import PackageDescription
22+
23+
let package = Package(
24+
name: "SparkConnectSwiftNetworkWordCount",
25+
platforms: [
26+
.macOS(.v15)
27+
],
28+
dependencies: [
29+
.package(url: "https://github.com/apache/spark-connect-swift.git", branch: "main")
30+
],
31+
targets: [
32+
.executableTarget(
33+
name: "SparkConnectSwiftNetworkWordCount",
34+
dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")]
35+
)
36+
]
37+
)

Examples/stream/README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# A Swift Network Word Count Application with Apache Spark Connect Swift Client
2+
3+
This is an example Swift stream processing application to show how to count words with Apache Spark Connect Swift Client library.
4+
5+
## Run `Spark Connect Server`
6+
7+
```bash
8+
./sbin/start-connect-server.sh --wait -c spark.log.level=ERROR
9+
```
10+
11+
## Run `Netcat` as a streaming input server
12+
13+
You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
14+
15+
```bash
16+
nc -lk 9999
17+
```
18+
19+
## Start streaming processing application
20+
21+
```bash
22+
$ swift run
23+
...
24+
Connected to Apache Spark 4.0.0 Server
25+
```
26+
27+
## Send input and check output
28+
29+
Then, any lines typed in the terminal running the `Netcat` server will be counted and printed on screen every second.
30+
31+
```bash
32+
$ nc -lk 9999
33+
apache spark
34+
apache hadoop
35+
```
36+
37+
`Spark Connect Server` output will look something like the following.
38+
39+
```bash
40+
-------------------------------------------
41+
Batch: 0
42+
-------------------------------------------
43+
+----+--------+
44+
|word|count(1)|
45+
+----+--------+
46+
+----+--------+
47+
48+
-------------------------------------------
49+
Batch: 1
50+
-------------------------------------------
51+
+------+--------+
52+
| word|count(1)|
53+
+------+--------+
54+
|apache| 1|
55+
| spark| 1|
56+
+------+--------+
57+
58+
-------------------------------------------
59+
Batch: 2
60+
-------------------------------------------
61+
+------+--------+
62+
| word|count(1)|
63+
+------+--------+
64+
|apache| 2|
65+
| spark| 1|
66+
|hadoop| 1|
67+
+------+--------+
68+
```

Examples/stream/Sources/main.swift

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
20+
import SparkConnect
21+
22+
let spark = try await SparkSession.builder.getOrCreate()
23+
print("Connected to Apache Spark \(await spark.version) Server")
24+
25+
let lines =
26+
await spark
27+
.readStream
28+
.format("socket")
29+
.option("host", "localhost")
30+
.option("port", "9999")
31+
.load()
32+
33+
let word =
34+
await lines
35+
.selectExpr("explode(split(value, ' ')) as word")
36+
37+
let wordCounts =
38+
await word
39+
.groupBy("word")
40+
.agg("count(*)")
41+
42+
let query =
43+
try await wordCounts
44+
.writeStream
45+
.outputMode("complete")
46+
.format("console")
47+
.start()
48+
49+
_ = try await query.awaitTermination()

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ SELECT * FROM t
108108
+----+
109109
```
110110
111-
You can find more complete examples including Web Server application in the `Examples` directory.
111+
You can find more complete examples including Web Server and Streaming applications in the `Examples` directory.
112112
113113
## How to use `Spark SQL REPL` via `Spark Connect for Swift`
114114

Sources/SparkConnect/DataStreamWriter.swift

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import Foundation
2020

2121
public enum Trigger {
22-
case OneTimeTrigger
23-
case AvailableNowTrigger
24-
case ProcessingTimeTrigger(intervalMs: Int64)
25-
case ContinuousTrigger(intervalMs: Int64)
22+
case OneTime
23+
case AvailableNow
24+
case ProcessingTime(_ intervalMs: Int64)
25+
case Continuous(_ intervalMs: Int64)
2626
}
2727

2828
/// An actor used to write a streaming `DataFrame` to external storage systems
@@ -32,7 +32,7 @@ public actor DataStreamWriter: Sendable {
3232

3333
var source: String? = nil
3434

35-
var trigger: Trigger? = nil
35+
var trigger: Trigger = Trigger.ProcessingTime(0)
3636

3737
var path: String? = nil
3838

@@ -155,15 +155,14 @@ public actor DataStreamWriter: Sendable {
155155
}
156156
writeStreamOperationStart.trigger =
157157
switch self.trigger {
158-
case .ProcessingTimeTrigger(let intervalMs):
159-
.processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
160-
case .OneTimeTrigger:
161-
.once(true)
162-
case .AvailableNowTrigger:
163-
.availableNow(true)
164-
case .ContinuousTrigger(let intervalMs):
165-
.continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
166-
default: .once(true)
158+
case .ProcessingTime(let intervalMs):
159+
.processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
160+
case .OneTime:
161+
.once(true)
162+
case .AvailableNow:
163+
.availableNow(true)
164+
case .Continuous(let intervalMs):
165+
.continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
167166
}
168167
if let outputMode = self.outputMode {
169168
writeStreamOperationStart.outputMode = outputMode

Tests/SparkConnectTests/DataStreamTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ struct DataStreamTests {
5252
.option("checkpointLocation", checkpoint)
5353
.outputMode("append")
5454
.format("orc")
55-
.trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000))
55+
.trigger(Trigger.ProcessingTime(1000))
5656
.start(output)
5757
#expect(try await query.isActive)
5858
// Wait for processing

0 commit comments

Comments
 (0)