diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index d373a5b..d3b9abd 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -151,6 +151,9 @@ public actor DataFrame: Sendable { /// Execute the plan and try to fill `schema` and `batches`. private func execute() async throws { + // Clear all existing batches. + self.batches.removeAll() + try await withGRPCClient( transport: .http2NIOPosix( target: .dns(host: spark.client.host, port: spark.client.port), diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 1839f76..45c4117 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -311,6 +311,15 @@ struct DataFrameTests { await spark.stop() } + @Test + func collectMultiple() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark.range(1) + #expect(try await df.collect().count == 1) + #expect(try await df.collect().count == 1) + await spark.stop() + } + @Test func head() async throws { let spark = try await SparkSession.builder.getOrCreate()