Skip to content

Commit 49bb86c

Browse files
committed
[SPARK-51971] Improve DataFrame.collect to return the original values
### What changes were proposed in this pull request? This PR aims to improve `DataFrame.collect` to return the original values. Note that this PR provides simple value types first. More types like `Decimal` will be added later. ### Why are the changes needed? The initial implementation has a limitation to return rows of `String` values. ### Does this PR introduce _any_ user-facing change? No, because there is no released versions yet. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #99 from dongjoon-hyun/SPARK-51971. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ac3c85e commit 49bb86c

File tree

7 files changed

+117
-100
lines changed

7 files changed

+117
-100
lines changed

Sources/SparkConnect/Catalog.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public actor Catalog: Sendable {
252252
catalog.tableExists = tableExists
253253
return catalog
254254
})
255-
return "true" == (try await df.collect().first!.get(0) as! String)
255+
return try await df.collect()[0].getAsBool(0)
256256
}
257257

258258
/// Check if the table or view with the specified name exists. This can either be a temporary
@@ -270,7 +270,7 @@ public actor Catalog: Sendable {
270270
catalog.tableExists = tableExists
271271
return catalog
272272
})
273-
return "true" == (try await df.collect().first!.get(0) as! String)
273+
return try await df.collect()[0].getAsBool(0)
274274
}
275275

276276
/// Check if the function with the specified name exists. This can either be a temporary function
@@ -287,7 +287,7 @@ public actor Catalog: Sendable {
287287
catalog.functionExists = functionExists
288288
return catalog
289289
})
290-
return "true" == (try await df.collect().first!.get(0) as! String)
290+
return try await df.collect()[0].getAsBool(0)
291291
}
292292

293293
/// Check if the function with the specified name exists in the specified database under the Hive
@@ -305,7 +305,7 @@ public actor Catalog: Sendable {
305305
catalog.functionExists = functionExists
306306
return catalog
307307
})
308-
return "true" == (try await df.collect().first!.get(0) as! String)
308+
return try await df.collect()[0].getAsBool(0)
309309
}
310310

311311
/// Caches the specified table in-memory.
@@ -338,7 +338,7 @@ public actor Catalog: Sendable {
338338
catalog.isCached = isCached
339339
return catalog
340340
})
341-
return "true" == (try await df.collect().first!.get(0) as! String)
341+
return try await df.collect()[0].getAsBool(0)
342342
}
343343

344344
/// Invalidates and refreshes all the cached data and metadata of the given table.
@@ -407,7 +407,7 @@ public actor Catalog: Sendable {
407407
catalog.dropTempView = dropTempView
408408
return catalog
409409
})
410-
return "true" == (try await df.collect().first!.get(0) as! String)
410+
return try await df.collect().first!.getAsBool(0)
411411
}
412412

413413
/// Drops the global temporary view with the given view name in the catalog. If the view has been
@@ -423,6 +423,6 @@ public actor Catalog: Sendable {
423423
catalog.dropGlobalTempView = dropGlobalTempView
424424
return catalog
425425
})
426-
return "true" == (try await df.collect().first!.get(0) as! String)
426+
return try await df.collect()[0].getAsBool(0)
427427
}
428428
}

Sources/SparkConnect/DataFrame.swift

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,34 @@ public actor DataFrame: Sendable {
208208
for i in 0..<batch.length {
209209
var values: [Sendable?] = []
210210
for column in batch.columns {
211-
let str = column.array as! AsString
212211
if column.data.isNull(i) {
213212
values.append(nil)
214-
} else if column.data.type.info == ArrowType.ArrowBinary {
215-
let binary = str.asString(i).utf8.map { String(format: "%02x", $0) }.joined(separator: " ")
216-
values.append("[\(binary)]")
217213
} else {
218-
values.append(str.asString(i))
214+
let array = column.array
215+
switch column.data.type.info {
216+
case .primitiveInfo(.boolean):
217+
values.append(array.asAny(i) as? Bool)
218+
case .primitiveInfo(.int8):
219+
values.append(array.asAny(i) as? Int8)
220+
case .primitiveInfo(.int16):
221+
values.append(array.asAny(i) as? Int16)
222+
case .primitiveInfo(.int32):
223+
values.append(array.asAny(i) as? Int32)
224+
case .primitiveInfo(.int64):
225+
values.append(array.asAny(i) as! Int64)
226+
case .primitiveInfo(.float):
227+
values.append(array.asAny(i) as? Float)
228+
case .primitiveInfo(.double):
229+
values.append(array.asAny(i) as? Double)
230+
case .primitiveInfo(.date32):
231+
values.append(array.asAny(i) as! Date)
232+
case ArrowType.ArrowBinary:
233+
values.append((array as! AsString).asString(i).utf8)
234+
case .complexInfo(.strct):
235+
values.append((array as! AsString).asString(i))
236+
default:
237+
values.append(array.asAny(i) as? String)
238+
}
219239
}
220240
}
221241
result.append(Row(valueArray: values))

Sources/SparkConnect/Row.swift

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public struct Row: Sendable, Equatable {
5050
return values[i]
5151
}
5252

53+
public func getAsBool(_ i: Int) throws -> Bool {
54+
return try get(i) as! Bool
55+
}
56+
5357
public static func == (lhs: Row, rhs: Row) -> Bool {
5458
if lhs.values.count != rhs.values.count {
5559
return false
@@ -59,16 +63,8 @@ public struct Row: Sendable, Equatable {
5963
return true
6064
} else if let a = x as? Bool, let b = y as? Bool {
6165
return a == b
62-
} else if let a = x as? Int, let b = y as? Int {
63-
return a == b
64-
} else if let a = x as? Int8, let b = y as? Int8 {
65-
return a == b
66-
} else if let a = x as? Int16, let b = y as? Int16 {
67-
return a == b
68-
} else if let a = x as? Int32, let b = y as? Int32 {
69-
return a == b
70-
} else if let a = x as? Int64, let b = y as? Int64 {
71-
return a == b
66+
} else if let a = x as? any FixedWidthInteger, let b = y as? any FixedWidthInteger {
67+
return Int64(a) == Int64(b)
7268
} else if let a = x as? Float, let b = y as? Float {
7369
return a == b
7470
} else if let a = x as? Double, let b = y as? Double {

0 commit comments

Comments
 (0)