Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,38 @@ public actor DataFrame: Sendable {
return try await lastN.collect()
}

/// Returns true if the `collect` and `take` methods can be run locally
/// (without any Spark executors).
/// - Returns: True if the plan is local.
public func isLocal() async throws -> Bool {
try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getIsLocal(spark.sessionID, plan))
return response.isLocal.isLocal
}
}

/// Returns true if this `DataFrame` contains one or more sources that continuously return data as it
/// arrives.
/// - Returns: True if a plan is streaming.
public func isStreaming() async throws -> Bool {
try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getIsStreaming(spark.sessionID, plan))
return response.isStreaming.isStreaming
}
}

/// Checks if the ``DataFrame`` is empty and returns a boolean value.
/// - Returns: `true` if the ``DataFrame`` is empty, `false` otherwise.
public func isEmpty() async throws -> Bool {
Expand Down
20 changes: 20 additions & 0 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -556,4 +556,24 @@ public actor SparkConnectClient {
plan.opType = .root(relation)
return plan
}

func getIsLocal(_ sessionID: String, _ plan: Plan) async -> AnalyzePlanRequest {
return analyze(
sessionID,
{
var isLocal = AnalyzePlanRequest.IsLocal()
isLocal.plan = plan
return OneOf_Analyze.isLocal(isLocal)
})
}

func getIsStreaming(_ sessionID: String, _ plan: Plan) async -> AnalyzePlanRequest {
return analyze(
sessionID,
{
var isStreaming = AnalyzePlanRequest.IsStreaming()
isStreaming.plan = plan
return OneOf_Analyze.isStreaming(isStreaming)
})
}
}
16 changes: 16 additions & 0 deletions Tests/SparkConnectTests/DataFrameTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ struct DataFrameTests {
await spark.stop()
}

@Test
func isLocal() async throws {
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.sql("SHOW DATABASES").isLocal())
#expect(try await spark.sql("SHOW TABLES").isLocal())
#expect(try await spark.range(1).isLocal() == false)
await spark.stop()
}

@Test
func isStreaming() async throws {
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.range(1).isStreaming() == false)
await spark.stop()
}

#if !os(Linux)
@Test
func sort() async throws {
Expand Down
Loading