@@ -29,7 +29,7 @@ import Synchronization
29
29
public actor DataFrame : Sendable {
30
30
var spark : SparkSession
31
31
var plan : Plan
32
- var schema : DataType ? = nil
32
+ private var _schema : DataType ? = nil
33
33
private var batches : [ RecordBatch ] = [ RecordBatch] ( )
34
34
35
35
/// Create a new `DataFrame`instance with the given Spark session and plan.
@@ -57,7 +57,7 @@ public actor DataFrame: Sendable {
57
57
/// Set the schema. This is used to store the analized schema response from `Spark Connect` server.
58
58
/// - Parameter schema: <#schema description#>
59
59
private func setSchema( _ schema: DataType ) {
60
- self . schema = schema
60
+ self . _schema = schema
61
61
}
62
62
63
63
/// Add `Apache Arrow`'s `RecordBatch`s to the internal array.
@@ -67,9 +67,10 @@ public actor DataFrame: Sendable {
67
67
}
68
68
69
69
/// Return the `SparkSession` of this `DataFrame`.
70
- /// - Returns: A `SparkSession`
71
- public func sparkSession( ) -> SparkSession {
72
- return self . spark
70
+ public var sparkSession : SparkSession {
71
+ get async throws {
72
+ return self . spark
73
+ }
73
74
}
74
75
75
76
/// A method to access the underlying Spark's `RDD`.
@@ -82,32 +83,35 @@ public actor DataFrame: Sendable {
82
83
}
83
84
84
85
/// Return an array of column name strings
85
- /// - Returns: a string array
86
- public func columns( ) async throws -> [ String ] {
87
- var columns : [ String ] = [ ]
88
- try await analyzePlanIfNeeded ( )
89
- for field in self . schema!. struct. fields {
90
- columns. append ( field. name)
86
+ public var columns : [ String ] {
87
+ get async throws {
88
+ var columns : [ String ] = [ ]
89
+ try await analyzePlanIfNeeded ( )
90
+ for field in self . _schema!. struct. fields {
91
+ columns. append ( field. name)
92
+ }
93
+ return columns
91
94
}
92
- return columns
93
95
}
94
96
95
97
/// Return a `JSON` string of data type because we cannot expose the internal type ``DataType``.
96
- /// - Returns: a `JSON` string.
97
- public func schema( ) async throws -> String {
98
- try await analyzePlanIfNeeded ( )
99
- return try self . schema!. jsonString ( )
98
+ public var schema : String {
99
+ get async throws {
100
+ try await analyzePlanIfNeeded ( )
101
+ return try self . _schema!. jsonString ( )
102
+ }
100
103
}
101
104
102
- var dtypes : [ ( String , String ) ] {
105
+ /// Returns all column names and their data types as an array.
106
+ public var dtypes : [ ( String , String ) ] {
103
107
get async throws {
104
108
try await analyzePlanIfNeeded ( )
105
- return try self . schema !. struct. fields. map { ( $0. name, try $0. dataType. simpleString) }
109
+ return try self . _schema !. struct. fields. map { ( $0. name, try $0. dataType. simpleString) }
106
110
}
107
111
}
108
112
109
113
private func analyzePlanIfNeeded( ) async throws {
110
- if self . schema != nil {
114
+ if self . _schema != nil {
111
115
return
112
116
}
113
117
try await withGRPCClient (
@@ -224,7 +228,7 @@ public actor DataFrame: Sendable {
224
228
public func show( ) async throws {
225
229
try await execute ( )
226
230
227
- if let schema = self . schema {
231
+ if let schema = self . _schema {
228
232
var columns : [ TextTableColumn ] = [ ]
229
233
for f in schema. struct. fields {
230
234
columns. append ( TextTableColumn ( header: f. name) )
@@ -342,7 +346,7 @@ public actor DataFrame: Sendable {
342
346
return self
343
347
}
344
348
345
- var storageLevel : StorageLevel {
349
+ public var storageLevel : StorageLevel {
346
350
get async throws {
347
351
try await withGRPCClient (
348
352
transport: . http2NIOPosix(
@@ -403,7 +407,7 @@ public actor DataFrame: Sendable {
403
407
}
404
408
405
409
/// Returns a ``DataFrameWriter`` that can be used to write non-streaming data.
406
- var write : DataFrameWriter {
410
+ public var write : DataFrameWriter {
407
411
get {
408
412
return DataFrameWriter ( df: self )
409
413
}
0 commit comments