diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index 80e5692..90eca9f 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -454,10 +454,13 @@ public actor DataFrame: Sendable { } } + /// Prints the physical plan to the console for debugging purposes. public func explain() async throws { try await explain("simple") } + /// Prints the plans (logical and physical) to the console for debugging purposes. + /// - Parameter extended: If `false`, prints only the physical plan. public func explain(_ extended: Bool) async throws { if (extended) { try await explain("extended") @@ -466,6 +469,9 @@ public actor DataFrame: Sendable { } } + /// Prints the plans (logical and physical) with a format specified by a given explain mode. + /// - Parameter mode: the expected output format of plans; + /// `simple`, `extended`, `codegen`, `cost`, `formatted`. public func explain(_ mode: String) async throws { try await withGRPCClient( transport: .http2NIOPosix( @@ -479,6 +485,23 @@ public actor DataFrame: Sendable { } } + /// Returns a best-effort snapshot of the files that compose this Dataset. This method simply + /// asks each constituent BaseRelation for its respective files and takes the union of all + /// results. Depending on the source relations, this may not find all input files. Duplicates are removed. + /// - Returns: An array of file path strings. + public func inputFiles() async throws -> [String] { + try await withGRPCClient( + transport: .http2NIOPosix( + target: .dns(host: spark.client.host, port: spark.client.port), + transportSecurity: .plaintext + ) + ) { client in + let service = Spark_Connect_SparkConnectService.Client(wrapping: client) + let response = try await service.analyzePlan(spark.client.getInputFiles(spark.sessionID, plan)) + return response.inputFiles.files + } + } + /// Prints the schema to the console in a nice tree format. public func printSchema() async throws { try await printSchema(Int32.max) diff --git a/Sources/SparkConnect/SparkConnectClient.swift b/Sources/SparkConnect/SparkConnectClient.swift index 904e76e..5173c0e 100644 --- a/Sources/SparkConnect/SparkConnectClient.swift +++ b/Sources/SparkConnect/SparkConnectClient.swift @@ -307,6 +307,17 @@ public actor SparkConnectClient { }) } + func getInputFiles(_ sessionID: String, _ plan: Plan) async -> AnalyzePlanRequest + { + return analyze( + sessionID, + { + var inputFiles = AnalyzePlanRequest.InputFiles() + inputFiles.plan = plan + return OneOf_Analyze.inputFiles(inputFiles) + }) + } + func getTreeString(_ sessionID: String, _ plan: Plan, _ level: Int32) async -> AnalyzePlanRequest { return analyze( diff --git a/Tests/SparkConnectTests/DataFrameReaderTests.swift b/Tests/SparkConnectTests/DataFrameReaderTests.swift index 78968ec..f1049b1 100644 --- a/Tests/SparkConnectTests/DataFrameReaderTests.swift +++ b/Tests/SparkConnectTests/DataFrameReaderTests.swift @@ -108,4 +108,14 @@ struct DataFrameReaderTests { } await spark.stop() } + + @Test + func inputFiles() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let path = "../examples/src/main/resources/users.orc" + let answer = try await spark.read.format("orc").load(path).inputFiles() + #expect(answer.count == 1) + #expect(answer[0].hasSuffix("users.orc")) + await spark.stop() + } } diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index afd182c..b8e07b0 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -118,6 +118,13 @@ struct DataFrameTests { await spark.stop() } + @Test + func inputFiles() async throws { + let spark = try await SparkSession.builder.getOrCreate() + #expect(try await spark.range(1).inputFiles().isEmpty) + await spark.stop() + } + @Test func count() async throws { let spark = try await SparkSession.builder.getOrCreate()