Skip to content

Commit f073e56

Browse files
committed
[SPARK-51702] Revise sparkSession/read/write/columns/schema/dtypes/storageLevel API
### What changes were proposed in this pull request? This PR aims to revise `sparkSession/read/write/columns/schema/dtypes/storageLevel` API to resemble the other language clients' APIs. ### Why are the changes needed? To provide a consistent UX to the users. ### Does this PR introduce _any_ user-facing change? - Change `func` to `var` like the following. ```swift - public func schema() async throws -> String { + public var schema: String { + get async throws { ``` - Add `public` explicitly. - Revise test cases accordingly. ### How was this patch tested? Pass the CIs and manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #39 from dongjoon-hyun/SPARK-51702. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c1817d7 commit f073e56

File tree

3 files changed

+41
-37
lines changed

3 files changed

+41
-37
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import Synchronization
2929
public actor DataFrame: Sendable {
3030
var spark: SparkSession
3131
var plan: Plan
32-
var schema: DataType? = nil
32+
private var _schema: DataType? = nil
3333
private var batches: [RecordBatch] = [RecordBatch]()
3434

3535
/// Create a new `DataFrame`instance with the given Spark session and plan.
@@ -57,7 +57,7 @@ public actor DataFrame: Sendable {
5757
/// Set the schema. This is used to store the analized schema response from `Spark Connect` server.
5858
/// - Parameter schema: <#schema description#>
5959
private func setSchema(_ schema: DataType) {
60-
self.schema = schema
60+
self._schema = schema
6161
}
6262

6363
/// Add `Apache Arrow`'s `RecordBatch`s to the internal array.
@@ -67,9 +67,10 @@ public actor DataFrame: Sendable {
6767
}
6868

6969
/// Return the `SparkSession` of this `DataFrame`.
70-
/// - Returns: A `SparkSession`
71-
public func sparkSession() -> SparkSession {
72-
return self.spark
70+
public var sparkSession: SparkSession {
71+
get async throws {
72+
return self.spark
73+
}
7374
}
7475

7576
/// A method to access the underlying Spark's `RDD`.
@@ -82,32 +83,35 @@ public actor DataFrame: Sendable {
8283
}
8384

8485
/// Return an array of column name strings
85-
/// - Returns: a string array
86-
public func columns() async throws -> [String] {
87-
var columns: [String] = []
88-
try await analyzePlanIfNeeded()
89-
for field in self.schema!.struct.fields {
90-
columns.append(field.name)
86+
public var columns: [String] {
87+
get async throws {
88+
var columns: [String] = []
89+
try await analyzePlanIfNeeded()
90+
for field in self._schema!.struct.fields {
91+
columns.append(field.name)
92+
}
93+
return columns
9194
}
92-
return columns
9395
}
9496

9597
/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
96-
/// - Returns: a `JSON` string.
97-
public func schema() async throws -> String {
98-
try await analyzePlanIfNeeded()
99-
return try self.schema!.jsonString()
98+
public var schema: String {
99+
get async throws {
100+
try await analyzePlanIfNeeded()
101+
return try self._schema!.jsonString()
102+
}
100103
}
101104

102-
var dtypes: [(String, String)] {
105+
/// Returns all column names and their data types as an array.
106+
public var dtypes: [(String, String)] {
103107
get async throws {
104108
try await analyzePlanIfNeeded()
105-
return try self.schema!.struct.fields.map { ($0.name, try $0.dataType.simpleString) }
109+
return try self._schema!.struct.fields.map { ($0.name, try $0.dataType.simpleString) }
106110
}
107111
}
108112

109113
private func analyzePlanIfNeeded() async throws {
110-
if self.schema != nil {
114+
if self._schema != nil {
111115
return
112116
}
113117
try await withGRPCClient(
@@ -224,7 +228,7 @@ public actor DataFrame: Sendable {
224228
public func show() async throws {
225229
try await execute()
226230

227-
if let schema = self.schema {
231+
if let schema = self._schema {
228232
var columns: [TextTableColumn] = []
229233
for f in schema.struct.fields {
230234
columns.append(TextTableColumn(header: f.name))
@@ -342,7 +346,7 @@ public actor DataFrame: Sendable {
342346
return self
343347
}
344348

345-
var storageLevel: StorageLevel {
349+
public var storageLevel: StorageLevel {
346350
get async throws {
347351
try await withGRPCClient(
348352
transport: .http2NIOPosix(
@@ -403,7 +407,7 @@ public actor DataFrame: Sendable {
403407
}
404408

405409
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming data.
406-
var write: DataFrameWriter {
410+
public var write: DataFrameWriter {
407411
get {
408412
return DataFrameWriter(df: self)
409413
}

Sources/SparkConnect/SparkSession.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public actor SparkSession {
7575
var serverSideSessionID: String = ""
7676

7777
/// A variable for ``SparkContext``. This is designed to throw exceptions by Apache Spark.
78-
var sparkContext: SparkContext {
78+
public var sparkContext: SparkContext {
7979
get throws {
8080
// SQLSTATE: 0A000
8181
// [UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT]
@@ -119,7 +119,7 @@ public actor SparkSession {
119119

120120
/// Returns a ``DataFrameReader`` that can be used to read non-streaming data in as a
121121
/// `DataFrame`
122-
var read: DataFrameReader {
122+
public var read: DataFrameReader {
123123
get {
124124
return DataFrameReader(sparkSession: self)
125125
}
@@ -140,7 +140,7 @@ public actor SparkSession {
140140
/// This is defined as the return type of `SparkSession.sparkContext` method.
141141
/// This is an empty `Struct` type because `sparkContext` method is designed to throw
142142
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.
143-
struct SparkContext {
143+
public struct SparkContext: Sendable {
144144
}
145145

146146
/// A builder to create ``SparkSession``

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ struct DataFrameTests {
2626
@Test
2727
func sparkSession() async throws {
2828
let spark = try await SparkSession.builder.getOrCreate()
29-
#expect(try await spark.range(1).sparkSession() == spark)
29+
#expect(try await spark.range(1).sparkSession == spark)
3030
await spark.stop()
3131
}
3232

@@ -42,30 +42,30 @@ struct DataFrameTests {
4242
@Test
4343
func columns() async throws {
4444
let spark = try await SparkSession.builder.getOrCreate()
45-
#expect(try await spark.sql("SELECT 1 as col1").columns() == ["col1"])
46-
#expect(try await spark.sql("SELECT 1 as col1, 2 as col2").columns() == ["col1", "col2"])
47-
#expect(try await spark.sql("SELECT CAST(null as STRING) col1").columns() == ["col1"])
48-
#expect(try await spark.sql("DROP TABLE IF EXISTS nonexistent").columns() == [])
45+
#expect(try await spark.sql("SELECT 1 as col1").columns == ["col1"])
46+
#expect(try await spark.sql("SELECT 1 as col1, 2 as col2").columns == ["col1", "col2"])
47+
#expect(try await spark.sql("SELECT CAST(null as STRING) col1").columns == ["col1"])
48+
#expect(try await spark.sql("DROP TABLE IF EXISTS nonexistent").columns == [])
4949
await spark.stop()
5050
}
5151

5252
@Test
5353
func schema() async throws {
5454
let spark = try await SparkSession.builder.getOrCreate()
5555

56-
let schema1 = try await spark.sql("SELECT 'a' as col1").schema()
56+
let schema1 = try await spark.sql("SELECT 'a' as col1").schema
5757
#expect(
5858
schema1
5959
== #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
6060
)
6161

62-
let schema2 = try await spark.sql("SELECT 'a' as col1, 'b' as col2").schema()
62+
let schema2 = try await spark.sql("SELECT 'a' as col1, 'b' as col2").schema
6363
#expect(
6464
schema2
6565
== #"{"struct":{"fields":[{"name":"col1","dataType":{"string":{"collation":"UTF8_BINARY"}}},{"name":"col2","dataType":{"string":{"collation":"UTF8_BINARY"}}}]}}"#
6666
)
6767

68-
let emptySchema = try await spark.sql("DROP TABLE IF EXISTS nonexistent").schema()
68+
let emptySchema = try await spark.sql("DROP TABLE IF EXISTS nonexistent").schema
6969
#expect(emptySchema == #"{"struct":{}}"#)
7070
await spark.stop()
7171
}
@@ -136,15 +136,15 @@ struct DataFrameTests {
136136
@Test
137137
func selectNone() async throws {
138138
let spark = try await SparkSession.builder.getOrCreate()
139-
let emptySchema = try await spark.range(1).select().schema()
139+
let emptySchema = try await spark.range(1).select().schema
140140
#expect(emptySchema == #"{"struct":{}}"#)
141141
await spark.stop()
142142
}
143143

144144
@Test
145145
func select() async throws {
146146
let spark = try await SparkSession.builder.getOrCreate()
147-
let schema = try await spark.range(1).select("id").schema()
147+
let schema = try await spark.range(1).select("id").schema
148148
#expect(
149149
schema
150150
== #"{"struct":{"fields":[{"name":"id","dataType":{"long":{}}}]}}"#
@@ -155,7 +155,7 @@ struct DataFrameTests {
155155
@Test
156156
func selectMultipleColumns() async throws {
157157
let spark = try await SparkSession.builder.getOrCreate()
158-
let schema = try await spark.sql("SELECT * FROM VALUES (1, 2)").select("col2", "col1").schema()
158+
let schema = try await spark.sql("SELECT * FROM VALUES (1, 2)").select("col2", "col1").schema
159159
#expect(
160160
schema
161161
== #"{"struct":{"fields":[{"name":"col2","dataType":{"integer":{}}},{"name":"col1","dataType":{"integer":{}}}]}}"#
@@ -167,7 +167,7 @@ struct DataFrameTests {
167167
func selectInvalidColumn() async throws {
168168
let spark = try await SparkSession.builder.getOrCreate()
169169
try await #require(throws: Error.self) {
170-
let _ = try await spark.range(1).select("invalid").schema()
170+
let _ = try await spark.range(1).select("invalid").schema
171171
}
172172
await spark.stop()
173173
}

0 commit comments

Comments
 (0)