diff --git a/Examples/stream/Package.swift b/Examples/stream/Package.swift new file mode 100644 index 0000000..8c8ef7a --- /dev/null +++ b/Examples/stream/Package.swift @@ -0,0 +1,37 @@ +// swift-tools-version: 6.0 +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import PackageDescription + +let package = Package( + name: "SparkConnectSwiftNetworkWordCount", + platforms: [ + .macOS(.v15) + ], + dependencies: [ + .package(url: "https://github.com/apache/spark-connect-swift.git", branch: "main") + ], + targets: [ + .executableTarget( + name: "SparkConnectSwiftNetworkWordCount", + dependencies: [.product(name: "SparkConnect", package: "spark-connect-swift")] + ) + ] +) diff --git a/Examples/stream/README.md b/Examples/stream/README.md new file mode 100644 index 0000000..ee8553e --- /dev/null +++ b/Examples/stream/README.md @@ -0,0 +1,68 @@ +# A Swift Network Word Count Application with Apache Spark Connect Swift Client + +This is an example Swift stream processing application to show how to count words with Apache Spark Connect Swift Client library. + +## Run `Spark Connect Server` + +```bash +./sbin/start-connect-server.sh --wait -c spark.log.level=ERROR +``` + +## Run `Netcat` as a streaming input server + +You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using + +```bash +nc -lk 9999 +``` + +## Start streaming processing application + +```bash +$ swift run +... +Connected to Apache Spark 4.0.0 Server +``` + +## Send input and check output + +Then, any lines typed in the terminal running the `Netcat` server will be counted and printed on screen every second. + +```bash +$ nc -lk 9999 +apache spark +apache hadoop +``` + + `Spark Connect Server` output will look something like the following. + +```bash +------------------------------------------- +Batch: 0 +------------------------------------------- ++----+--------+ +|word|count(1)| ++----+--------+ ++----+--------+ + +------------------------------------------- +Batch: 1 +------------------------------------------- ++------+--------+ +| word|count(1)| ++------+--------+ +|apache| 1| +| spark| 1| ++------+--------+ + +------------------------------------------- +Batch: 2 +------------------------------------------- ++------+--------+ +| word|count(1)| ++------+--------+ +|apache| 2| +| spark| 1| +|hadoop| 1| ++------+--------+ +``` diff --git a/Examples/stream/Sources/main.swift b/Examples/stream/Sources/main.swift new file mode 100644 index 0000000..2ce9a40 --- /dev/null +++ b/Examples/stream/Sources/main.swift @@ -0,0 +1,49 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import SparkConnect + +let spark = try await SparkSession.builder.getOrCreate() +print("Connected to Apache Spark \(await spark.version) Server") + +let lines = + await spark + .readStream + .format("socket") + .option("host", "localhost") + .option("port", "9999") + .load() + +let word = + await lines + .selectExpr("explode(split(value, ' ')) as word") + +let wordCounts = + await word + .groupBy("word") + .agg("count(*)") + +let query = + try await wordCounts + .writeStream + .outputMode("complete") + .format("console") + .start() + +_ = try await query.awaitTermination() diff --git a/README.md b/README.md index 395a6a0..2049f8e 100644 --- a/README.md +++ b/README.md @@ -108,7 +108,7 @@ SELECT * FROM t +----+ ``` -You can find more complete examples including Web Server application in the `Examples` directory. +You can find more complete examples including Web Server and Streaming applications in the `Examples` directory. ## How to use `Spark SQL REPL` via `Spark Connect for Swift` diff --git a/Sources/SparkConnect/DataStreamWriter.swift b/Sources/SparkConnect/DataStreamWriter.swift index 0af04ff..9822899 100644 --- a/Sources/SparkConnect/DataStreamWriter.swift +++ b/Sources/SparkConnect/DataStreamWriter.swift @@ -19,10 +19,10 @@ import Foundation public enum Trigger { - case OneTimeTrigger - case AvailableNowTrigger - case ProcessingTimeTrigger(intervalMs: Int64) - case ContinuousTrigger(intervalMs: Int64) + case OneTime + case AvailableNow + case ProcessingTime(_ intervalMs: Int64) + case Continuous(_ intervalMs: Int64) } /// An actor used to write a streaming `DataFrame` to external storage systems @@ -32,7 +32,7 @@ public actor DataStreamWriter: Sendable { var source: String? = nil - var trigger: Trigger? = nil + var trigger: Trigger = Trigger.ProcessingTime(0) var path: String? = nil @@ -155,15 +155,14 @@ public actor DataStreamWriter: Sendable { } writeStreamOperationStart.trigger = switch self.trigger { - case .ProcessingTimeTrigger(let intervalMs): - .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND") - case .OneTimeTrigger: - .once(true) - case .AvailableNowTrigger: - .availableNow(true) - case .ContinuousTrigger(let intervalMs): - .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND") - default: .once(true) + case .ProcessingTime(let intervalMs): + .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND") + case .OneTime: + .once(true) + case .AvailableNow: + .availableNow(true) + case .Continuous(let intervalMs): + .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND") } if let outputMode = self.outputMode { writeStreamOperationStart.outputMode = outputMode diff --git a/Tests/SparkConnectTests/DataStreamTests.swift b/Tests/SparkConnectTests/DataStreamTests.swift index 00af6ae..73fbdf4 100644 --- a/Tests/SparkConnectTests/DataStreamTests.swift +++ b/Tests/SparkConnectTests/DataStreamTests.swift @@ -52,7 +52,7 @@ struct DataStreamTests { .option("checkpointLocation", checkpoint) .outputMode("append") .format("orc") - .trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000)) + .trigger(Trigger.ProcessingTime(1000)) .start(output) #expect(try await query.isActive) // Wait for processing