Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions Examples/stream/Package.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// swift-tools-version: 6.0
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

import PackageDescription

let package = Package(
name: "SparkConnectSwiftNetworkWordCount",
platforms: [
.macOS(.v15)
],
dependencies: [
.package(url: "https://github.com/apache/spark-connect-swift.git", branch: "main")
],
targets: [
.executableTarget(
name: "SparkConnectSwiftNetworkWordCount",
dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")]
)
]
)
68 changes: 68 additions & 0 deletions Examples/stream/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# A Swift Network Word Count Application with Apache Spark Connect Swift Client

This is an example Swift stream processing application to show how to count words with Apache Spark Connect Swift Client library.

## Run `Spark Connect Server`

```bash
./sbin/start-connect-server.sh --wait -c spark.log.level=ERROR
```

## Run `Netcat` as a streaming input server

You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using

```bash
nc -lk 9999
```

## Start streaming processing application

```bash
$ swift run
...
Connected to Apache Spark 4.0.0 Server
```

## Send input and check output

Then, any lines typed in the terminal running the `Netcat` server will be counted and printed on screen every second.

```bash
$ nc -lk 9999
apache spark
apache hadoop
```

`Spark Connect Server` output will look something like the following.

```bash
-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------+
|word|count(1)|
+----+--------+
+----+--------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+--------+
| word|count(1)|
+------+--------+
|apache| 1|
| spark| 1|
+------+--------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+--------+
| word|count(1)|
+------+--------+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+--------+
```
49 changes: 49 additions & 0 deletions Examples/stream/Sources/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

import SparkConnect

let spark = try await SparkSession.builder.getOrCreate()
print("Connected to Apache Spark \(await spark.version) Server")

let lines =
await spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()

let word =
await lines
.selectExpr("explode(split(value, ' ')) as word")

let wordCounts =
await word
.groupBy("word")
.agg("count(*)")

let query =
try await wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()

_ = try await query.awaitTermination()
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ SELECT * FROM t
+----+
```

You can find more complete examples including Web Server application in the `Examples` directory.
You can find more complete examples including Web Server and Streaming applications in the `Examples` directory.

## How to use `Spark SQL REPL` via `Spark Connect for Swift`

Expand Down
27 changes: 13 additions & 14 deletions Sources/SparkConnect/DataStreamWriter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import Foundation

public enum Trigger {
case OneTimeTrigger
case AvailableNowTrigger
case ProcessingTimeTrigger(intervalMs: Int64)
case ContinuousTrigger(intervalMs: Int64)
case OneTime
case AvailableNow
case ProcessingTime(_ intervalMs: Int64)
case Continuous(_ intervalMs: Int64)
}

/// An actor used to write a streaming `DataFrame` to external storage systems
Expand All @@ -32,7 +32,7 @@ public actor DataStreamWriter: Sendable {

var source: String? = nil

var trigger: Trigger? = nil
var trigger: Trigger = Trigger.ProcessingTime(0)

var path: String? = nil

Expand Down Expand Up @@ -155,15 +155,14 @@ public actor DataStreamWriter: Sendable {
}
writeStreamOperationStart.trigger =
switch self.trigger {
case .ProcessingTimeTrigger(let intervalMs):
.processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
case .OneTimeTrigger:
.once(true)
case .AvailableNowTrigger:
.availableNow(true)
case .ContinuousTrigger(let intervalMs):
.continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
default: .once(true)
case .ProcessingTime(let intervalMs):
.processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND")
case .OneTime:
.once(true)
case .AvailableNow:
.availableNow(true)
case .Continuous(let intervalMs):
.continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND")
}
if let outputMode = self.outputMode {
writeStreamOperationStart.outputMode = outputMode
Expand Down
2 changes: 1 addition & 1 deletion Tests/SparkConnectTests/DataStreamTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct DataStreamTests {
.option("checkpointLocation", checkpoint)
.outputMode("append")
.format("orc")
.trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000))
.trigger(Trigger.ProcessingTime(1000))
.start(output)
#expect(try await query.isActive)
// Wait for processing
Expand Down
Loading