Skip to content

Commit 119eeea

Browse files
committed
[SPARK-51815] Add Row struct
### What changes were proposed in this pull request? This PR aims to add `Row` struct and use it. ### Why are the changes needed? To make `DataFrame` APIs return `Row` instead of `[String?]` in `Swift`. ```swift - public func collect() async throws -> [[String?]] { + public func collect() async throws -> [Row] { - public func head(_ n: Int32 = 1) async throws -> [[String?]] { + public func head(_ n: Int32 = 1) async throws -> [Row] { ``` Note that `Row` is added to support general type fields, but this PR replaces the existing API's `[String?]` signature into `Row`-based signature. The detailed one-to-one mapping among other types will be handled later. ### Does this PR introduce _any_ user-facing change? Yes, but this is a change to the unreleased version. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #63 from dongjoon-hyun/SPARK-51815. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 21724f5 commit 119eeea

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+260
-47
lines changed

Sources/SparkConnect/Catalog.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public actor Catalog: Sendable {
100100
catalog.catType = .currentCatalog(Spark_Connect_CurrentCatalog())
101101
return catalog
102102
})
103-
return try await df.collect()[0][0]!
103+
return try await df.collect()[0][0] as! String
104104
}
105105

106106
/// Sets the current default catalog in this session.
@@ -130,7 +130,7 @@ public actor Catalog: Sendable {
130130
return catalog
131131
})
132132
return try await df.collect().map {
133-
CatalogMetadata(name: $0[0]!, description: $0[1])
133+
try CatalogMetadata(name: $0[0] as! String, description: $0[1] as? String)
134134
}
135135
}
136136

@@ -142,7 +142,7 @@ public actor Catalog: Sendable {
142142
catalog.catType = .currentDatabase(Spark_Connect_CurrentDatabase())
143143
return catalog
144144
})
145-
return try await df.collect()[0][0]!
145+
return try await df.collect()[0][0] as! String
146146
}
147147

148148
/// Sets the current default database in this session.
@@ -173,7 +173,7 @@ public actor Catalog: Sendable {
173173
return catalog
174174
})
175175
return try await df.collect().map {
176-
Database(name: $0[0]!, catalog: $0[1], description: $0[2], locationUri: $0[3]!)
176+
try Database(name: $0[0] as! String, catalog: $0[1] as? String, description: $0[2] as? String, locationUri: $0[3] as! String)
177177
}
178178
}
179179

@@ -189,7 +189,7 @@ public actor Catalog: Sendable {
189189
return catalog
190190
})
191191
return try await df.collect().map {
192-
Database(name: $0[0]!, catalog: $0[1], description: $0[2], locationUri: $0[3]!)
192+
try Database(name: $0[0] as! String, catalog: $0[1] as? String, description: $0[2] as? String, locationUri: $0[3] as! String)
193193
}.first!
194194
}
195195

Sources/SparkConnect/DataFrame.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,15 @@ public actor DataFrame: Sendable {
197197
}
198198
}
199199

200-
/// Execute the plan and return the result as ``[[String?]]``.
201-
/// - Returns: ``[[String?]]``
202-
public func collect() async throws -> [[String?]] {
200+
/// Execute the plan and return the result as ``[Row]``.
201+
/// - Returns: ``[Row]``
202+
public func collect() async throws -> [Row] {
203203
try await execute()
204204

205-
var result: [[String?]] = []
205+
var result: [Row] = []
206206
for batch in self.batches {
207207
for i in 0..<batch.length {
208-
var values: [String?] = []
208+
var values: [Sendable?] = []
209209
for column in batch.columns {
210210
let str = column.array as! AsString
211211
if column.data.isNull(i) {
@@ -217,7 +217,7 @@ public actor DataFrame: Sendable {
217217
values.append(str.asString(i))
218218
}
219219
}
220-
result.append(values)
220+
result.append(Row(valueArray: values))
221221
}
222222
}
223223

@@ -377,15 +377,15 @@ public actor DataFrame: Sendable {
377377

378378
/// Returns the first `n` rows.
379379
/// - Parameter n: The number of rows. (default: 1)
380-
/// - Returns: ``[[String?]]``
381-
public func head(_ n: Int32 = 1) async throws -> [[String?]] {
380+
/// - Returns: ``[Row]``
381+
public func head(_ n: Int32 = 1) async throws -> [Row] {
382382
return try await limit(n).collect()
383383
}
384384

385385
/// Returns the last `n` rows.
386386
/// - Parameter n: The number of rows.
387-
/// - Returns: ``[[String?]]``
388-
public func tail(_ n: Int32) async throws -> [[String?]] {
387+
/// - Returns: ``[Row]``
388+
public func tail(_ n: Int32) async throws -> [Row] {
389389
let lastN = DataFrame(spark:spark, plan: SparkConnectClient.getTail(self.plan.root, n))
390390
return try await lastN.collect()
391391
}

Sources/SparkConnect/Row.swift

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//
2+
// Licensed to the Apache Software Foundation (ASF) under one
3+
// or more contributor license agreements. See the NOTICE file
4+
// distributed with this work for additional information
5+
// regarding copyright ownership. The ASF licenses this file
6+
// to you under the Apache License, Version 2.0 (the
7+
// "License"); you may not use this file except in compliance
8+
// with the License. You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing,
13+
// software distributed under the License is distributed on an
14+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
// KIND, either express or implied. See the License for the
16+
// specific language governing permissions and limitations
17+
// under the License.
18+
//
19+
import Foundation
20+
21+
public struct Row: Sendable, Equatable {
22+
let values: [Sendable?]
23+
24+
public init(_ values: Sendable?...) {
25+
self.values = values
26+
}
27+
28+
public init(valueArray: [Sendable?]) {
29+
self.values = valueArray
30+
}
31+
32+
public static var empty: Row {
33+
return Row()
34+
}
35+
36+
public var size: Int { return length }
37+
38+
public var length: Int { return values.count }
39+
40+
subscript(index: Int) -> Sendable {
41+
get throws {
42+
return try get(index)
43+
}
44+
}
45+
46+
public func get(_ i: Int) throws -> Sendable {
47+
if i < 0 || i >= self.length {
48+
throw SparkConnectError.InvalidArgumentException
49+
}
50+
return values[i]
51+
}
52+
53+
public static func == (lhs: Row, rhs: Row) -> Bool {
54+
if lhs.values.count != rhs.values.count {
55+
return false
56+
}
57+
return lhs.values.elementsEqual(rhs.values) { (x, y) in
58+
if x == nil && y == nil {
59+
return true
60+
} else if let a = x as? Bool, let b = y as? Bool {
61+
return a == b
62+
} else if let a = x as? Int, let b = y as? Int {
63+
return a == b
64+
} else if let a = x as? Int8, let b = y as? Int8 {
65+
return a == b
66+
} else if let a = x as? Int16, let b = y as? Int16 {
67+
return a == b
68+
} else if let a = x as? Int32, let b = y as? Int32 {
69+
return a == b
70+
} else if let a = x as? Int64, let b = y as? Int64 {
71+
return a == b
72+
} else if let a = x as? Float, let b = y as? Float {
73+
return a == b
74+
} else if let a = x as? Double, let b = y as? Double {
75+
return a == b
76+
} else if let a = x as? String, let b = y as? String {
77+
return a == b
78+
} else {
79+
return false
80+
}
81+
}
82+
}
83+
84+
public func toString() -> String {
85+
return "[\(self.values.map { "\($0 ?? "null")" }.joined(separator: ","))]"
86+
}
87+
}
88+
89+
extension Row {
90+
}

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,15 +249,15 @@ struct DataFrameTests {
249249
@Test
250250
func sort() async throws {
251251
let spark = try await SparkSession.builder.getOrCreate()
252-
let expected = (1...10).map{ [String($0)] }
252+
let expected = Array((1...10).map{ Row(String($0)) })
253253
#expect(try await spark.range(10, 0, -1).sort("id").collect() == expected)
254254
await spark.stop()
255255
}
256256

257257
@Test
258258
func orderBy() async throws {
259259
let spark = try await SparkSession.builder.getOrCreate()
260-
let expected = (1...10).map{ [String($0)] }
260+
let expected = Array((1...10).map{ Row(String($0)) })
261261
#expect(try await spark.range(10, 0, -1).orderBy("id").collect() == expected)
262262
await spark.stop()
263263
}
@@ -284,28 +284,28 @@ struct DataFrameTests {
284284
#expect(
285285
try await spark.sql(
286286
"SELECT * FROM VALUES (1, true, 'abc'), (null, null, null), (3, false, 'def')"
287-
).collect() == [["1", "true", "abc"], [nil, nil, nil], ["3", "false", "def"]])
287+
).collect() == [Row("1", "true", "abc"), Row(nil, nil, nil), Row("3", "false", "def")])
288288
await spark.stop()
289289
}
290290

291291
@Test
292292
func head() async throws {
293293
let spark = try await SparkSession.builder.getOrCreate()
294294
#expect(try await spark.range(0).head().isEmpty)
295-
#expect(try await spark.range(2).sort("id").head() == [["0"]])
296-
#expect(try await spark.range(2).sort("id").head(1) == [["0"]])
297-
#expect(try await spark.range(2).sort("id").head(2) == [["0"], ["1"]])
298-
#expect(try await spark.range(2).sort("id").head(3) == [["0"], ["1"]])
295+
#expect(try await spark.range(2).sort("id").head() == [Row("0")])
296+
#expect(try await spark.range(2).sort("id").head(1) == [Row("0")])
297+
#expect(try await spark.range(2).sort("id").head(2) == [Row("0"), Row("1")])
298+
#expect(try await spark.range(2).sort("id").head(3) == [Row("0"), Row("1")])
299299
await spark.stop()
300300
}
301301

302302
@Test
303303
func tail() async throws {
304304
let spark = try await SparkSession.builder.getOrCreate()
305305
#expect(try await spark.range(0).tail(1).isEmpty)
306-
#expect(try await spark.range(2).sort("id").tail(1) == [["1"]])
307-
#expect(try await spark.range(2).sort("id").tail(2) == [["0"], ["1"]])
308-
#expect(try await spark.range(2).sort("id").tail(3) == [["0"], ["1"]])
306+
#expect(try await spark.range(2).sort("id").tail(1) == [Row("1")])
307+
#expect(try await spark.range(2).sort("id").tail(2) == [Row("0"), Row("1")])
308+
#expect(try await spark.range(2).sort("id").tail(3) == [Row("0"), Row("1")])
309309
await spark.stop()
310310
}
311311

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[[61 62 63]]

Tests/SparkConnectTests/Resources/queries/binary.sql.json

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

Tests/SparkConnectTests/Resources/queries/cache.sql.json

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

Tests/SparkConnectTests/Resources/queries/clear_cache.sql.json

Lines changed: 0 additions & 1 deletion
This file was deleted.

0 commit comments

Comments
 (0)