From 0ebd1b2a5ed7e763236d045df53d80ba0a2baba3 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 20 Apr 2025 20:16:52 +0900 Subject: [PATCH] [SPARK-51850] Fix `DataFrame.execute` to reset previously received arrow batch data --- Sources/SparkConnect/DataFrame.swift | 3 +++ Tests/SparkConnectTests/DataFrameTests.swift | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/Sources/SparkConnect/DataFrame.swift b/Sources/SparkConnect/DataFrame.swift index d373a5b..d3b9abd 100644 --- a/Sources/SparkConnect/DataFrame.swift +++ b/Sources/SparkConnect/DataFrame.swift @@ -151,6 +151,9 @@ public actor DataFrame: Sendable { /// Execute the plan and try to fill `schema` and `batches`. private func execute() async throws { + // Clear all existing batches. + self.batches.removeAll() + try await withGRPCClient( transport: .http2NIOPosix( target: .dns(host: spark.client.host, port: spark.client.port), diff --git a/Tests/SparkConnectTests/DataFrameTests.swift b/Tests/SparkConnectTests/DataFrameTests.swift index 1839f76..45c4117 100644 --- a/Tests/SparkConnectTests/DataFrameTests.swift +++ b/Tests/SparkConnectTests/DataFrameTests.swift @@ -311,6 +311,15 @@ struct DataFrameTests { await spark.stop() } + @Test + func collectMultiple() async throws { + let spark = try await SparkSession.builder.getOrCreate() + let df = try await spark.range(1) + #expect(try await df.collect().count == 1) + #expect(try await df.collect().count == 1) + await spark.stop() + } + @Test func head() async throws { let spark = try await SparkSession.builder.getOrCreate()