Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public actor DataFrame: Sendable {
self.batches.append(contentsOf: batches)
}

/// Return the `SparkSession` of this `DataFrame`.
/// - Returns: A `SparkSession`
public func sparkSession() -> SparkSession {
return self.spark
}

/// A method to access the underlying Spark's `RDD`.
/// In `Spark Connect`, this feature is not allowed by design.
public func rdd() throws {
Expand Down
6 changes: 6 additions & 0 deletions Sources/SparkConnect/Extension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,9 @@ extension Data {
/// Get an `Int32` value from unsafe 4 bytes.
var int32: Int32 { withUnsafeBytes({ $0.load(as: Int32.self) }) }
}

extension SparkSession: Equatable {
public static func == (lhs: SparkSession, rhs: SparkSession) -> Bool {
return lhs.sessionID == rhs.sessionID
}
}
2 changes: 1 addition & 1 deletion Sources/SparkConnect/SparkSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public actor SparkSession {
}

/// A unique session ID for this session from client.
var sessionID: String = UUID().uuidString
nonisolated let sessionID: String = UUID().uuidString

/// Get the current session ID
/// - Returns: the current session ID
Expand Down
7 changes: 7 additions & 0 deletions Tests/SparkConnectTests/DataFrameTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import Testing

/// A test suite for `DataFrame`
struct DataFrameTests {
@Test
func sparkSession() async throws {
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.range(1).sparkSession() == spark)
await spark.stop()
}

@Test
func rdd() async throws {
let spark = try await SparkSession.builder.getOrCreate()
Expand Down
Loading