Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Sources/SparkConnect/DataFrame.swift
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,13 @@ public actor DataFrame: Sendable {
}
}

/// Prints the physical plan to the console for debugging purposes.
public func explain() async throws {
try await explain("simple")
}

/// Prints the plans (logical and physical) to the console for debugging purposes.
/// - Parameter extended: If `false`, prints only the physical plan.
public func explain(_ extended: Bool) async throws {
if (extended) {
try await explain("extended")
Expand All @@ -466,6 +469,9 @@ public actor DataFrame: Sendable {
}
}

/// Prints the plans (logical and physical) with a format specified by a given explain mode.
/// - Parameter mode: the expected output format of plans;
/// `simple`, `extended`, `codegen`, `cost`, `formatted`.
public func explain(_ mode: String) async throws {
try await withGRPCClient(
transport: .http2NIOPosix(
Expand All @@ -479,6 +485,23 @@ public actor DataFrame: Sendable {
}
}

/// Returns a best-effort snapshot of the files that compose this Dataset. This method simply
/// asks each constituent BaseRelation for its respective files and takes the union of all
/// results. Depending on the source relations, this may not find all input files. Duplicates are removed.
/// - Returns: An array of file path strings.
public func inputFiles() async throws -> [String] {
try await withGRPCClient(
transport: .http2NIOPosix(
target: .dns(host: spark.client.host, port: spark.client.port),
transportSecurity: .plaintext
)
) { client in
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
let response = try await service.analyzePlan(spark.client.getInputFiles(spark.sessionID, plan))
return response.inputFiles.files
}
}

/// Prints the schema to the console in a nice tree format.
public func printSchema() async throws {
try await printSchema(Int32.max)
Expand Down
11 changes: 11 additions & 0 deletions Sources/SparkConnect/SparkConnectClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ public actor SparkConnectClient {
})
}

func getInputFiles(_ sessionID: String, _ plan: Plan) async -> AnalyzePlanRequest
{
return analyze(
sessionID,
{
var inputFiles = AnalyzePlanRequest.InputFiles()
inputFiles.plan = plan
return OneOf_Analyze.inputFiles(inputFiles)
})
}

func getTreeString(_ sessionID: String, _ plan: Plan, _ level: Int32) async -> AnalyzePlanRequest
{
return analyze(
Expand Down
10 changes: 10 additions & 0 deletions Tests/SparkConnectTests/DataFrameReaderTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,14 @@ struct DataFrameReaderTests {
}
await spark.stop()
}

@Test
func inputFiles() async throws {
let spark = try await SparkSession.builder.getOrCreate()
let path = "../examples/src/main/resources/users.orc"
let answer = try await spark.read.format("orc").load(path).inputFiles()
#expect(answer.count == 1)
#expect(answer[0].hasSuffix("users.orc"))
await spark.stop()
}
}
7 changes: 7 additions & 0 deletions Tests/SparkConnectTests/DataFrameTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ struct DataFrameTests {
await spark.stop()
}

@Test
func inputFiles() async throws {
let spark = try await SparkSession.builder.getOrCreate()
#expect(try await spark.range(1).inputFiles().isEmpty)
await spark.stop()
}

@Test
func count() async throws {
let spark = try await SparkSession.builder.getOrCreate()
Expand Down
Loading