From 41d252c5c3a7bd2678bed9fe93cbcee50ba76257 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 11 May 2025 08:23:06 -0700 Subject: [PATCH 1/2] [SPARK-52069] Support `DataStreamReader` and `DataStreamWriter` --- Sources/SparkConnect/DataFrame.swift | 9 +- Sources/SparkConnect/DataStreamReader.swift | 221 ++++++++++++++++++ Sources/SparkConnect/DataStreamWriter.swift | 209 +++++++++++++++++ Sources/SparkConnect/SparkSession.swift | 23 +- Sources/SparkConnect/StreamingQuery.swift | 4 +- Sources/SparkConnect/TypeAliases.swift | 1 + Tests/SparkConnectTests/DataStreamTests.swift | 69 ++++++ 7 files changed, 530 insertions(+), 6 deletions(-) create mode 100644 Sources/SparkConnect/DataStreamReader.swift create mode 100644 Sources/SparkConnect/DataStreamWriter.swift create mode 100644 Tests/SparkConnectTests/DataStreamTests.swift diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index ff30f25..34fa77d 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -1381,7 +1381,7 @@ public actor DataFrame: Sendable { /// Returns a ``DataFrameWriter`` that can be used to write non-streaming data. public var write: DataFrameWriter { get { - return DataFrameWriter(df: self) + DataFrameWriter(df: self) } } @@ -1391,4 +1391,11 @@ public actor DataFrame: Sendable { public func writeTo(_ table: String) -> DataFrameWriterV2 { return DataFrameWriterV2(table, self) } + + /// Returns a ``DataStreamWriter`` that can be used to write streaming data. + public var writeStream: DataStreamWriter { + get { + DataStreamWriter(df: self) + } + } } diff --git a/Sources/SparkConnect/DataStreamReader.swift b/Sources/SparkConnect/DataStreamReader.swift new file mode 100644 index 0000000..41763e7 --- /dev/null +++ b/Sources/SparkConnect/DataStreamReader.swift @@ -0,0 +1,221 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +import Foundation + +/// An actor to load a streaming `Dataset` from external storage systems +/// (e.g. file systems, key-value stores, etc). Use `SparkSession.readStream` to access this. +public actor DataStreamReader: Sendable { + var source: String = "" + + var paths: [String] = [] + + var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary([:]) + + var userSpecifiedSchemaDDL: String? = nil + + let sparkSession: SparkSession + + init(sparkSession: SparkSession) { + self.sparkSession = sparkSession + } + + /// Specifies the input data source format. + /// - Parameter source: A string. + /// - Returns: A ``DataStreamReader``. + public func format(_ source: String) -> DataStreamReader { + self.source = source + return self + } + + /// Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema + /// automatically from data. By specifying the schema here, the underlying data source can skip + /// the schema inference step, and thus speed up data loading. + /// - Parameter schema: A DDL schema string. + /// - Returns: A `DataStreamReader`. + @discardableResult + public func schema(_ schema: String) async throws -> DataStreamReader { + // Validate by parsing. + do { + try await sparkSession.client.ddlParse(schema) + } catch { + throw SparkConnectError.InvalidTypeException + } + self.userSpecifiedSchemaDDL = schema + return self + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A value string. + /// - Returns: A `DataStreamReader`. + public func option(_ key: String, _ value: String) -> DataStreamReader { + self.extraOptions[key] = value + return self + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Bool` value. + /// - Returns: A `DataStreamReader`. + public func option(_ key: String, _ value: Bool) -> DataStreamReader { + self.extraOptions[key] = String(value) + return self + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Int64` value. + /// - Returns: A `DataStreamReader`. + public func option(_ key: String, _ value: Int64) -> DataStreamReader { + self.extraOptions[key] = String(value) + return self + } + + /// Adds an input option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A `Double` value. + /// - Returns: A `DataStreamReader`. + public func option(_ key: String, _ value: Double) -> DataStreamReader { + self.extraOptions[key] = String(value) + return self + } + + /// Adds input options for the underlying data source. + /// - Parameter options: A string-string dictionary. + /// - Returns: A `DataStreamReader`. + public func options(_ options: [String: String]) -> DataStreamReader { + for (key, value) in options { + self.extraOptions[key] = value + } + return self + } + + /// Loads input data stream in as a `DataFrame`, for data streams that don't require a path + /// (e.g. external key-value stores). + /// - Returns: A `DataFrame`. + public func load() -> DataFrame { + return load([]) + } + + /// Loads input data stream in as a `DataFrame`, for data streams that require a path + /// (e.g. data backed by a local or distributed file system). + /// - Parameter path: A path string. + /// - Returns: A `DataFrame`. + public func load(_ path: String) -> DataFrame { + return load([path]) + } + + func load(_ paths: [String]) -> DataFrame { + self.paths = paths + + var dataSource = DataSource() + dataSource.format = self.source + dataSource.paths = self.paths + dataSource.options = self.extraOptions.toStringDictionary() + if let userSpecifiedSchemaDDL = self.userSpecifiedSchemaDDL { + dataSource.schema = userSpecifiedSchemaDDL + } + + var read = Read() + read.dataSource = dataSource + read.isStreaming = true + + var relation = Relation() + relation.read = read + + var plan = Plan() + plan.opType = .root(relation) + + return DataFrame(spark: sparkSession, plan: plan) + } + + /// Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should + /// support streaming mode. + /// - Parameter tableName: The name of the table. + /// - Returns: A ``DataFrame``. + public func table(_ tableName: String) -> DataFrame { + var namedTable = NamedTable() + namedTable.unparsedIdentifier = tableName + namedTable.options = self.extraOptions.toStringDictionary() + + var read = Read() + read.namedTable = namedTable + read.isStreaming = true + + var relation = Relation() + relation.read = read + + var plan = Plan() + plan.opType = .root(relation) + + return DataFrame(spark: sparkSession, plan: plan) + } + + /// Loads a text file stream and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func text(_ path: String) -> DataFrame { + self.source = "text" + return load(path) + } + + /// Loads a CSV file stream and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func csv(_ path: String) -> DataFrame { + self.source = "csv" + return load(path) + } + + /// Loads a JSON file stream and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func json(_ path: String) -> DataFrame { + self.source = "json" + return load(path) + } + + /// Loads an XML file stream and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func xml(_ path: String) -> DataFrame { + self.source = "xml" + return load(path) + } + + /// Loads an ORC file stream and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func orc(_ path: String) -> DataFrame { + self.source = "orc" + return load(path) + } + + /// Loads a Parquet file stream and returns the result as a `DataFrame`. + /// - Parameter path: A path string + /// - Returns: A `DataFrame`. + public func parquet(_ path: String) -> DataFrame { + self.source = "parquet" + return load(path) + } +} diff --git a/Sources/SparkConnect/DataStreamWriter.swift b/Sources/SparkConnect/DataStreamWriter.swift new file mode 100644 index 0000000..0af04ff --- /dev/null +++ b/Sources/SparkConnect/DataStreamWriter.swift @@ -0,0 +1,209 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +import Foundation + +public enum Trigger { + case OneTimeTrigger + case AvailableNowTrigger + case ProcessingTimeTrigger(intervalMs: Int64) + case ContinuousTrigger(intervalMs: Int64) +} + +/// An actor used to write a streaming `DataFrame` to external storage systems +/// (e.g. file systems, key-value stores, etc). Use `DataFrame.writeStream` to access this. +public actor DataStreamWriter: Sendable { + var queryName: String? = nil + + var source: String? = nil + + var trigger: Trigger? = nil + + var path: String? = nil + + var tableName: String? = nil + + var outputMode: String? = nil + + var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary() + + var partitioningColumns: [String]? = nil + + var clusteringColumns: [String]? = nil + + let df: DataFrame + + init(df: DataFrame) { + self.df = df + } + + /// Specifies the name of the ``StreamingQuery`` that can be + /// started with `start()`. This name must be unique among all the currently active queries in + /// the associated SparkSession. + /// - Parameter queryName: A string name. + /// - Returns: A ``DataStreamWriter``. + public func queryName(_ queryName: String) -> DataStreamWriter { + self.queryName = queryName + return self + } + + /// Specifies the underlying output data source. + /// - Parameter source: A string. + /// - Returns: A `DataStreamWriter`. + public func format(_ source: String) -> DataStreamWriter { + self.source = source + return self + } + + /// Specifies how data of a streaming ``DataFrame`` is written to a streaming sink. + /// + /// - `append`: only the new rows in the streaming ``DataFrame`` will be written to the sink. + /// - `complete`: all the rows in the streaming ``DataFrame`` will be written to the sink + /// every time there are some updates. + /// - `update`: only the rows that were updated in the streaming ``DataFrame`` will be + /// written to the sink every time there are some updates. If the query doesn't contain + /// aggregations, it will be equivalent to `append` mode. + /// + /// - Parameter outputMode: A string for outputMode. + /// - Returns: A ``DataStreamWriter``. + public func outputMode(_ outputMode: String) -> DataStreamWriter { + self.outputMode = outputMode + return self + } + + /// Adds an output option for the underlying data source. + /// - Parameters: + /// - key: A key string. + /// - value: A value string. + /// - Returns: A `DataStreamWriter`. + public func option(_ key: String, _ value: String) -> DataStreamWriter { + self.extraOptions[key] = value + return self + } + + /// Partitions the output by the given columns on the file system. If specified, the output is + /// laid out on the file system similar to Hive's partitioning scheme. + /// - Parameter colNames: Column names to partition. + /// - Returns: A ``DataStreamWriter``. + public func partitionBy(_ colNames: String...) -> DataStreamWriter { + self.partitioningColumns = colNames + return self + } + + /// Clusters the output by the given columns. If specified, the output is laid out such that + /// records with similar values on the clustering column are grouped together in the same file. + /// - Parameter colNames: Column names to cluster. + /// - Returns: A ``DataStreamWriter``. + public func clusterBy(_ colNames: String...) -> DataStreamWriter { + self.clusteringColumns = colNames + return self + } + + /// Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external + /// key-value stores). + public func trigger(_ trigger: Trigger) async throws -> DataStreamWriter { + self.trigger = trigger + return self + } + + /// Starts the execution of the streaming query, which will continually output results to the + /// given path as new data arrives. The returned ``StreamingQuery`` object can be used to interact + /// with the stream. + /// - Parameter path: A path to write. + /// - Returns: A ``StreamingQuery``. + public func start(_ path: String) async throws -> StreamingQuery { + self.path = path + return try await start() + } + + /// Starts the execution of the streaming query, which will continually output results to the + /// given path as new data arrives. The returned ``StreamingQuery`` object can be used to interact + /// with the stream. Throws exceptions if the following conditions are met: + /// - Another run of the same streaming query, that is a streaming query sharing the same + /// checkpoint location, is already active on the same Spark Driver + /// - The SQL configuration `spark.sql.streaming.stopActiveRunOnRestart` is enabled + /// - The active run cannot be stopped within the timeout controlled by the SQL configuration `spark.sql.streaming.stopTimeout` + /// + /// - Returns: A ``StreamingQuery``. + public func start() async throws -> StreamingQuery { + var writeStreamOperationStart = WriteStreamOperationStart() + writeStreamOperationStart.input = (await df.getPlan() as! Plan).root + if let source = self.source { + writeStreamOperationStart.format = source + } + writeStreamOperationStart.options = self.extraOptions.toStringDictionary() + if let partitioningColumns = self.partitioningColumns { + writeStreamOperationStart.partitioningColumnNames = partitioningColumns + } + if let clusteringColumns = self.clusteringColumns { + writeStreamOperationStart.clusteringColumnNames = clusteringColumns + } + writeStreamOperationStart.trigger = + switch self.trigger { + case .ProcessingTimeTrigger(let intervalMs): + .processingTimeInterval("INTERVAL \(intervalMs) MILLISECOND") + case .OneTimeTrigger: + .once(true) + case .AvailableNowTrigger: + .availableNow(true) + case .ContinuousTrigger(let intervalMs): + .continuousCheckpointInterval("INTERVAL \(intervalMs) MILLISECOND") + default: .once(true) + } + if let outputMode = self.outputMode { + writeStreamOperationStart.outputMode = outputMode + } + if let queryName = self.queryName { + writeStreamOperationStart.queryName = queryName + } + if let path = self.path { + writeStreamOperationStart.sinkDestination = .path(path) + } + if let tableName = self.tableName { + writeStreamOperationStart.sinkDestination = .tableName(tableName) + } + + var command = Spark_Connect_Command() + command.writeStreamOperationStart = writeStreamOperationStart + + let response = try await df.spark.client.execute(df.spark.sessionID, command) + let result = response.first!.writeStreamOperationStartResult + if result.hasQueryStartedEventJson { + // TODO: post + } + + let query = try await StreamingQuery( + UUID(uuidString: result.queryID.id)!, + UUID(uuidString: result.queryID.runID)!, + result.name, + self.df.sparkSession, + ) + + return query + } + + /// Starts the execution of the streaming query, which will continually output results to the + /// given table as new data arrives. The returned ``StreamingQuery`` object can be used to interact + /// with the stream. + /// - Parameter tableName: A table name. + /// - Returns: A ``StreamingQuery``. + public func toTable(tableName: String) async throws -> StreamingQuery { + self.tableName = tableName + return try await start() + } +} diff --git a/Sources/SparkConnect/SparkSession.swift b/Sources/SparkConnect/SparkSession.swift index e588ace..392f387 100644 --- a/Sources/SparkConnect/SparkSession.swift +++ b/Sources/SparkConnect/SparkSession.swift @@ -194,7 +194,7 @@ public actor SparkSession { /// Returns a DataFrameReader for reading data in various formats. /// /// The DataFrameReader provides methods to load data from external storage systems - /// such as file systems, databases, and streaming sources. + /// such as file systems and databases. /// /// ```swift /// // Read a CSV file @@ -208,14 +208,31 @@ public actor SparkSession { /// .json("path/to/file.json") /// /// // Read an ORC file - /// let parquetData = spark.read + /// let orcData = spark.read /// .orc("path/to/file.orc") /// ``` /// /// - Returns: A DataFrameReader instance configured for this session public var read: DataFrameReader { get { - return DataFrameReader(sparkSession: self) + DataFrameReader(sparkSession: self) + } + } + + /// Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`. + /// + /// The DataFrameReader provides methods to load data from external storage systems + /// such as file systems, databases, and streaming sources. + /// + /// ```swift + /// // Read an ORC file + /// let orcData = spark.readStream.orc("path/to/file.orc") + /// ``` + /// + /// - Returns: A DataFrameReader instance configured for this session + public var readStream: DataStreamReader { + get { + DataStreamReader(sparkSession: self) } } diff --git a/Sources/SparkConnect/StreamingQuery.swift b/Sources/SparkConnect/StreamingQuery.swift index 6d62f2e..af3459f 100644 --- a/Sources/SparkConnect/StreamingQuery.swift +++ b/Sources/SparkConnect/StreamingQuery.swift @@ -64,8 +64,8 @@ public actor StreamingQuery: Sendable { _ command: StreamingQueryCommand.OneOf_Command ) async throws -> [ExecutePlanResponse] { return try await self.sparkSession.client.executeStreamingQueryCommand( - self.id.uuidString, - self.runId.uuidString, + self.id.uuidString.lowercased(), + self.runId.uuidString.lowercased(), command ) } diff --git a/Sources/SparkConnect/TypeAliases.swift b/Sources/SparkConnect/TypeAliases.swift index cb7864b..e0543b6 100644 --- a/Sources/SparkConnect/TypeAliases.swift +++ b/Sources/SparkConnect/TypeAliases.swift @@ -63,4 +63,5 @@ typealias UnresolvedAttribute = Spark_Connect_Expression.UnresolvedAttribute typealias WithColumnsRenamed = Spark_Connect_WithColumnsRenamed typealias WriteOperation = Spark_Connect_WriteOperation typealias WriteOperationV2 = Spark_Connect_WriteOperationV2 +typealias WriteStreamOperationStart = Spark_Connect_WriteStreamOperationStart typealias YearMonthInterval = Spark_Connect_DataType.YearMonthInterval diff --git a/Tests/SparkConnectTests/DataStreamTests.swift b/Tests/SparkConnectTests/DataStreamTests.swift new file mode 100644 index 0000000..00af6ae --- /dev/null +++ b/Tests/SparkConnectTests/DataStreamTests.swift @@ -0,0 +1,69 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +import Foundation +import SparkConnect +import Testing + +/// A test suite for `DataStreamReader` and `DataStreamWriter` +@Suite(.serialized) +struct DataStreamTests { + @Test + func query() async throws { + let spark = try await SparkSession.builder.getOrCreate() + + // Prepare directories + let input = "/tmp/input-" + UUID().uuidString + let checkpoint = "/tmp/checkpoint-" + UUID().uuidString + let output = "/tmp/output-" + UUID().uuidString + try await spark.range(2025).write.orc(input) + + // Create a streaming dataframe. + let df = + try await spark + .readStream + .schema("id LONG") + .orc(input) + #expect(try await df.isStreaming()) + + // Processing + let df2 = await df.selectExpr("id", "id * 10 as value") + + // Start a streaming query + let query = + try await df2 + .writeStream + .option("checkpointLocation", checkpoint) + .outputMode("append") + .format("orc") + .trigger(Trigger.ProcessingTimeTrigger(intervalMs: 1000)) + .start(output) + #expect(try await query.isActive) + // Wait for processing + try await Task.sleep(nanoseconds: 2_000_000_000) + + try await query.stop() + #expect(try await query.isActive == false) + + let df3 = await spark.read.orc(output) + #expect(try await df3.dtypes.count == 2) + #expect(try await df3.count() == 2025) + await spark.stop() + } +} From aead0f25505ae95911c22dbe9829a1847f7c0a8c Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 11 May 2025 09:14:28 -0700 Subject: [PATCH 2/2] doc --- Sources/SparkConnect/DataFrame.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 34fa77d..a34c98b 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -113,6 +113,8 @@ import Synchronization /// - ``unpivot(_:_:_:_:)`` /// - ``melt(_:_:_:)`` /// - ``melt(_:_:_:_:)`` +/// - ``transpose()`` +/// - ``transpose(_:)`` /// /// ### Join Operations /// - ``join(_:)`` @@ -166,6 +168,7 @@ import Synchronization /// ### Write Operations /// - ``write`` /// - ``writeTo(_:)`` +/// - ``writeStream`` /// /// ### Sampling /// - ``sample(_:_:_:)``