Skip to content

Commit d1cd6d7

Browse files
committed
[SPARK-51837] Support inputFiles for DataFrame
### What changes were proposed in this pull request? This PR aims to support `inputFiles` API for `DataFrame`. ### Why are the changes needed? For feature parity. ### Does this PR introduce _any_ user-facing change? No, this is an addition to the unreleased version. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #67 from dongjoon-hyun/SPARK-51837. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 69a4ac4 commit d1cd6d7

File tree

4 files changed

+51
-0
lines changed

4 files changed

+51
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,10 +454,13 @@ public actor DataFrame: Sendable {
454454
}
455455
}
456456

457+
/// Prints the physical plan to the console for debugging purposes.
457458
public func explain() async throws {
458459
try await explain("simple")
459460
}
460461

462+
/// Prints the plans (logical and physical) to the console for debugging purposes.
463+
/// - Parameter extended: If `false`, prints only the physical plan.
461464
public func explain(_ extended: Bool) async throws {
462465
if (extended) {
463466
try await explain("extended")
@@ -466,6 +469,9 @@ public actor DataFrame: Sendable {
466469
}
467470
}
468471

472+
/// Prints the plans (logical and physical) with a format specified by a given explain mode.
473+
/// - Parameter mode: the expected output format of plans;
474+
/// `simple`, `extended`, `codegen`, `cost`, `formatted`.
469475
public func explain(_ mode: String) async throws {
470476
try await withGRPCClient(
471477
transport: .http2NIOPosix(
@@ -479,6 +485,23 @@ public actor DataFrame: Sendable {
479485
}
480486
}
481487

488+
/// Returns a best-effort snapshot of the files that compose this Dataset. This method simply
489+
/// asks each constituent BaseRelation for its respective files and takes the union of all
490+
/// results. Depending on the source relations, this may not find all input files. Duplicates are removed.
491+
/// - Returns: An array of file path strings.
492+
public func inputFiles() async throws -> [String] {
493+
try await withGRPCClient(
494+
transport: .http2NIOPosix(
495+
target: .dns(host: spark.client.host, port: spark.client.port),
496+
transportSecurity: .plaintext
497+
)
498+
) { client in
499+
let service = Spark_Connect_SparkConnectService.Client(wrapping: client)
500+
let response = try await service.analyzePlan(spark.client.getInputFiles(spark.sessionID, plan))
501+
return response.inputFiles.files
502+
}
503+
}
504+
482505
/// Prints the schema to the console in a nice tree format.
483506
public func printSchema() async throws {
484507
try await printSchema(Int32.max)

Sources/SparkConnect/SparkConnectClient.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,17 @@ public actor SparkConnectClient {
307307
})
308308
}
309309

310+
func getInputFiles(_ sessionID: String, _ plan: Plan) async -> AnalyzePlanRequest
311+
{
312+
return analyze(
313+
sessionID,
314+
{
315+
var inputFiles = AnalyzePlanRequest.InputFiles()
316+
inputFiles.plan = plan
317+
return OneOf_Analyze.inputFiles(inputFiles)
318+
})
319+
}
320+
310321
func getTreeString(_ sessionID: String, _ plan: Plan, _ level: Int32) async -> AnalyzePlanRequest
311322
{
312323
return analyze(

Tests/SparkConnectTests/DataFrameReaderTests.swift

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,14 @@ struct DataFrameReaderTests {
108108
}
109109
await spark.stop()
110110
}
111+
112+
@Test
113+
func inputFiles() async throws {
114+
let spark = try await SparkSession.builder.getOrCreate()
115+
let path = "../examples/src/main/resources/users.orc"
116+
let answer = try await spark.read.format("orc").load(path).inputFiles()
117+
#expect(answer.count == 1)
118+
#expect(answer[0].hasSuffix("users.orc"))
119+
await spark.stop()
120+
}
111121
}

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ struct DataFrameTests {
118118
await spark.stop()
119119
}
120120

121+
@Test
122+
func inputFiles() async throws {
123+
let spark = try await SparkSession.builder.getOrCreate()
124+
#expect(try await spark.range(1).inputFiles().isEmpty)
125+
await spark.stop()
126+
}
127+
121128
@Test
122129
func count() async throws {
123130
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)