Skip to content

Commit c5f1ff7

Browse files
committed
[SPARK-52069] Support DataStreamReader and DataStreamWriter
### What changes were proposed in this pull request? This PR aims to support `DataStreamReader` and `DataStreamWriter`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #126 from dongjoon-hyun/SPARK-52069. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 10f2854 commit c5f1ff7

File tree

7 files changed

+533
-6
lines changed

7 files changed

+533
-6
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ import Synchronization
113113
/// - ``unpivot(_:_:_:_:)``
114114
/// - ``melt(_:_:_:)``
115115
/// - ``melt(_:_:_:_:)``
116+
/// - ``transpose()``
117+
/// - ``transpose(_:)``
116118
///
117119
/// ### Join Operations
118120
/// - ``join(_:)``
@@ -166,6 +168,7 @@ import Synchronization
166168
/// ### Write Operations
167169
/// - ``write``
168170
/// - ``writeTo(_:)``
171+
/// - ``writeStream``
169172
///
170173
/// ### Sampling
171174
/// - ``sample(_:_:_:)``
@@ -1381,7 +1384,7 @@ public actor DataFrame: Sendable {
13811384
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming data.
13821385
public var write: DataFrameWriter {
13831386
get {
1384-
return DataFrameWriter(df: self)
1387+
DataFrameWriter(df: self)
13851388
}
13861389
}
13871390

@@ -1391,4 +1394,11 @@ public actor DataFrame: Sendable {
13911394
public func writeTo(_ table: String) -> DataFrameWriterV2 {
13921395
return DataFrameWriterV2(table, self)
13931396
}
1397+
1398+
/// Returns a ``DataStreamWriter`` that can be used to write streaming data.
1399+
public var writeStream: DataStreamWriter {
1400+
get {
1401+
DataStreamWriter(df: self)
1402+
}
1403+
}
13941404
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
import Foundation
20+
21+
/// An actor to load a streaming `Dataset` from external storage systems
22+
/// (e.g. file systems, key-value stores, etc). Use `SparkSession.readStream` to access this.
23+
public actor DataStreamReader: Sendable {
24+
var source: String = ""
25+
26+
var paths: [String] = []
27+
28+
var extraOptions: CaseInsensitiveDictionary = CaseInsensitiveDictionary([:])
29+
30+
var userSpecifiedSchemaDDL: String? = nil
31+
32+
let sparkSession: SparkSession
33+
34+
init(sparkSession: SparkSession) {
35+
self.sparkSession = sparkSession
36+
}
37+
38+
/// Specifies the input data source format.
39+
/// - Parameter source: A string.
40+
/// - Returns: A ``DataStreamReader``.
41+
public func format(_ source: String) -> DataStreamReader {
42+
self.source = source
43+
return self
44+
}
45+
46+
/// Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
47+
/// automatically from data. By specifying the schema here, the underlying data source can skip
48+
/// the schema inference step, and thus speed up data loading.
49+
/// - Parameter schema: A DDL schema string.
50+
/// - Returns: A `DataStreamReader`.
51+
@discardableResult
52+
public func schema(_ schema: String) async throws -> DataStreamReader {
53+
// Validate by parsing.
54+
do {
55+
try await sparkSession.client.ddlParse(schema)
56+
} catch {
57+
throw SparkConnectError.InvalidTypeException
58+
}
59+
self.userSpecifiedSchemaDDL = schema
60+
return self
61+
}
62+
63+
/// Adds an input option for the underlying data source.
64+
/// - Parameters:
65+
/// - key: A key string.
66+
/// - value: A value string.
67+
/// - Returns: A `DataStreamReader`.
68+
public func option(_ key: String, _ value: String) -> DataStreamReader {
69+
self.extraOptions[key] = value
70+
return self
71+
}
72+
73+
/// Adds an input option for the underlying data source.
74+
/// - Parameters:
75+
/// - key: A key string.
76+
/// - value: A `Bool` value.
77+
/// - Returns: A `DataStreamReader`.
78+
public func option(_ key: String, _ value: Bool) -> DataStreamReader {
79+
self.extraOptions[key] = String(value)
80+
return self
81+
}
82+
83+
/// Adds an input option for the underlying data source.
84+
/// - Parameters:
85+
/// - key: A key string.
86+
/// - value: A `Int64` value.
87+
/// - Returns: A `DataStreamReader`.
88+
public func option(_ key: String, _ value: Int64) -> DataStreamReader {
89+
self.extraOptions[key] = String(value)
90+
return self
91+
}
92+
93+
/// Adds an input option for the underlying data source.
94+
/// - Parameters:
95+
/// - key: A key string.
96+
/// - value: A `Double` value.
97+
/// - Returns: A `DataStreamReader`.
98+
public func option(_ key: String, _ value: Double) -> DataStreamReader {
99+
self.extraOptions[key] = String(value)
100+
return self
101+
}
102+
103+
/// Adds input options for the underlying data source.
104+
/// - Parameter options: A string-string dictionary.
105+
/// - Returns: A `DataStreamReader`.
106+
public func options(_ options: [String: String]) -> DataStreamReader {
107+
for (key, value) in options {
108+
self.extraOptions[key] = value
109+
}
110+
return self
111+
}
112+
113+
/// Loads input data stream in as a `DataFrame`, for data streams that don't require a path
114+
/// (e.g. external key-value stores).
115+
/// - Returns: A `DataFrame`.
116+
public func load() -> DataFrame {
117+
return load([])
118+
}
119+
120+
/// Loads input data stream in as a `DataFrame`, for data streams that require a path
121+
/// (e.g. data backed by a local or distributed file system).
122+
/// - Parameter path: A path string.
123+
/// - Returns: A `DataFrame`.
124+
public func load(_ path: String) -> DataFrame {
125+
return load([path])
126+
}
127+
128+
func load(_ paths: [String]) -> DataFrame {
129+
self.paths = paths
130+
131+
var dataSource = DataSource()
132+
dataSource.format = self.source
133+
dataSource.paths = self.paths
134+
dataSource.options = self.extraOptions.toStringDictionary()
135+
if let userSpecifiedSchemaDDL = self.userSpecifiedSchemaDDL {
136+
dataSource.schema = userSpecifiedSchemaDDL
137+
}
138+
139+
var read = Read()
140+
read.dataSource = dataSource
141+
read.isStreaming = true
142+
143+
var relation = Relation()
144+
relation.read = read
145+
146+
var plan = Plan()
147+
plan.opType = .root(relation)
148+
149+
return DataFrame(spark: sparkSession, plan: plan)
150+
}
151+
152+
/// Define a Streaming DataFrame on a Table. The DataSource corresponding to the table should
153+
/// support streaming mode.
154+
/// - Parameter tableName: The name of the table.
155+
/// - Returns: A ``DataFrame``.
156+
public func table(_ tableName: String) -> DataFrame {
157+
var namedTable = NamedTable()
158+
namedTable.unparsedIdentifier = tableName
159+
namedTable.options = self.extraOptions.toStringDictionary()
160+
161+
var read = Read()
162+
read.namedTable = namedTable
163+
read.isStreaming = true
164+
165+
var relation = Relation()
166+
relation.read = read
167+
168+
var plan = Plan()
169+
plan.opType = .root(relation)
170+
171+
return DataFrame(spark: sparkSession, plan: plan)
172+
}
173+
174+
/// Loads a text file stream and returns the result as a `DataFrame`.
175+
/// - Parameter path: A path string
176+
/// - Returns: A `DataFrame`.
177+
public func text(_ path: String) -> DataFrame {
178+
self.source = "text"
179+
return load(path)
180+
}
181+
182+
/// Loads a CSV file stream and returns the result as a `DataFrame`.
183+
/// - Parameter path: A path string
184+
/// - Returns: A `DataFrame`.
185+
public func csv(_ path: String) -> DataFrame {
186+
self.source = "csv"
187+
return load(path)
188+
}
189+
190+
/// Loads a JSON file stream and returns the result as a `DataFrame`.
191+
/// - Parameter path: A path string
192+
/// - Returns: A `DataFrame`.
193+
public func json(_ path: String) -> DataFrame {
194+
self.source = "json"
195+
return load(path)
196+
}
197+
198+
/// Loads an XML file stream and returns the result as a `DataFrame`.
199+
/// - Parameter path: A path string
200+
/// - Returns: A `DataFrame`.
201+
public func xml(_ path: String) -> DataFrame {
202+
self.source = "xml"
203+
return load(path)
204+
}
205+
206+
/// Loads an ORC file stream and returns the result as a `DataFrame`.
207+
/// - Parameter path: A path string
208+
/// - Returns: A `DataFrame`.
209+
public func orc(_ path: String) -> DataFrame {
210+
self.source = "orc"
211+
return load(path)
212+
}
213+
214+
/// Loads a Parquet file stream and returns the result as a `DataFrame`.
215+
/// - Parameter path: A path string
216+
/// - Returns: A `DataFrame`.
217+
public func parquet(_ path: String) -> DataFrame {
218+
self.source = "parquet"
219+
return load(path)
220+
}
221+
}

0 commit comments

Comments
 (0)