Skip to content

Commit 8dedd94

Browse files
committed
[SPARK-51620] Support columns for DataFrame
### What changes were proposed in this pull request? This PR aims to support `columns` API for `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #27 from dongjoon-hyun/SPARK-51620. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 672ce65 commit 8dedd94

File tree

2 files changed

+35
-3
lines changed

2 files changed

+35
-3
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,28 @@ public actor DataFrame: Sendable {
7171
throw SparkConnectError.UnsupportedOperationException
7272
}
7373

74+
/// Return an array of column name strings
75+
/// - Returns: a string array
76+
public func columns() async throws -> [String] {
77+
var columns: [String] = []
78+
try await analyzePlanIfNeeded()
79+
for field in self.schema!.struct.fields {
80+
columns.append(field.name)
81+
}
82+
return columns
83+
}
84+
7485
/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
7586
/// - Returns: a `JSON` string.
7687
public func schema() async throws -> String {
77-
var dataType: String? = nil
88+
try await analyzePlanIfNeeded()
89+
return try self.schema!.jsonString()
90+
}
7891

92+
private func analyzePlanIfNeeded() async throws {
93+
if self.schema != nil {
94+
return
95+
}
7996
try await withGRPCClient(
8097
transport: .http2NIOPosix(
8198
target: .dns(host: spark.client.host, port: spark.client.port),
@@ -85,9 +102,8 @@ public actor DataFrame: Sendable {
85102
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
86103
let response = try await service.analyzePlan(
87104
spark.client.getAnalyzePlanRequest(spark.sessionID, plan))
88-
dataType = try response.schema.schema.jsonString()
105+
self.setSchema(response.schema.schema)
89106
}
90-
return dataType!
91107
}
92108

93109
/// Return the total number of rows.
@@ -266,6 +282,8 @@ public actor DataFrame: Sendable {
266282
return try await select().limit(1).count() == 0
267283
}
268284

285+
/// Persist this `DataFrame` with the default storage level (`MEMORY_AND_DISK`).
286+
/// - Returns: A `DataFrame`.
269287
public func cache() async throws -> DataFrame {
270288
return try await persist()
271289
}
@@ -291,6 +309,10 @@ public actor DataFrame: Sendable {
291309
return self
292310
}
293311

312+
/// Mark the `DataFrame` as non-persistent, and remove all blocks for it from memory and disk.
313+
/// This will not un-persist any cached data that is built upon this `DataFrame`.
314+
/// - Parameter blocking: Whether to block until all blocks are deleted.
315+
/// - Returns: A `DataFrame`
294316
public func unpersist(blocking: Bool = false) async throws -> DataFrame {
295317
try await withGRPCClient(
296318
transport: .http2NIOPosix(

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ struct DataFrameTests {
3232
await spark.stop()
3333
}
3434

35+
@Test
36+
func columns() async throws {
37+
let spark = try await SparkSession.builder.getOrCreate()
38+
#expect(try await spark.sql("SELECT 1 as col1").columns() == ["col1"])
39+
#expect(try await spark.sql("SELECT 1 as col1, 2 as col2").columns() == ["col1", "col2"])
40+
#expect(try await spark.sql("SELECT CAST(null as STRING) col1").columns() == ["col1"])
41+
#expect(try await spark.sql("DROP TABLE IF EXISTS nonexistent").columns() == [])
42+
await spark.stop()
43+
}
44+
3545
@Test
3646
func schema() async throws {
3747
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)