Skip to content

Commit 19e1b35

Browse files
committed
[SPARK-51656] Support time for SparkSession
### What changes were proposed in this pull request? This PR aims to support `time` for `SparkSession`. ### Why are the changes needed? For feature parity with the Scala client. ```swift try await spark.time(spark.range(1000).count) try await spark.time(spark.range(1).collect) try await spark.time(spark.range(10).show) ``` ### Does this PR introduce _any_ user-facing change? No, this is a new addition to the unreleased version. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #33 from dongjoon-hyun/SPARK-51656. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 7639540 commit 19e1b35

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

Sources/SparkConnect/SparkSession.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
// under the License.
1818
//
1919

20+
import Dispatch
2021
import Foundation
2122
import GRPCCore
2223
import GRPCNIOTransportHTTP2
@@ -116,12 +117,26 @@ public actor SparkSession {
116117
return try await DataFrame(spark: self, sqlText: sqlText)
117118
}
118119

120+
/// Returns a ``DataFrameReader`` that can be used to read non-streaming data in as a
121+
/// `DataFrame`
119122
var read: DataFrameReader {
120123
get {
121124
return DataFrameReader(sparkSession: self)
122125
}
123126
}
124127

128+
/// Executes some code block and prints to stdout the time taken to execute the block.
129+
/// - Parameter f: A function to execute.
130+
/// - Returns: The result of the executed code.
131+
public func time<T: Sendable>(_ f: () async throws -> T) async throws -> T {
132+
let start = DispatchTime.now()
133+
let ret = try await f()
134+
let end = DispatchTime.now()
135+
let elapsed = (end.uptimeNanoseconds - start.uptimeNanoseconds) / 1_000_000
136+
print("Time taken: \(elapsed) ms")
137+
return ret
138+
}
139+
125140
/// This is defined as the return type of `SparkSession.sparkContext` method.
126141
/// This is an empty `Struct` type because `sparkContext` method is designed to throw
127142
/// `UNSUPPORTED_CONNECT_FEATURE.SESSION_SPARK_CONTEXT`.

Tests/SparkConnectTests/SparkSessionTests.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,15 @@ struct SparkSessionTests {
7474
#expect(try await spark.range(0, 100, 2).count() == 50)
7575
await spark.stop()
7676
}
77+
78+
@Test
79+
func time() async throws {
80+
let spark = try await SparkSession.builder.getOrCreate()
81+
#expect(try await spark.time(spark.range(1000).count) == 1000)
82+
#if !os(Linux)
83+
#expect(try await spark.time(spark.range(1).collect) == [["0"]])
84+
try await spark.time(spark.range(10).show)
85+
#endif
86+
await spark.stop()
87+
}
7788
}

0 commit comments

Comments
 (0)