Skip to content

Commit 9875380

Browse files
committed
[SPARK-51850] Fix DataFrame.execute to reset previously received Arrow batch data
### What changes were proposed in this pull request? This PR aims to fix `DataFrame.execute` to reset previously received Arrow batch data. ### Why are the changes needed? To prevent wrong data duplication when repeating execution on the same `DataFrame`. ### Does this PR introduce _any_ user-facing change? This, this is a bug fix. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #72 from dongjoon-hyun/SPARK-51850. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent cb08c76 commit 9875380

File tree

2 files changed

+12
-0
lines changed

2 files changed

+12
-0
lines changed

Sources/SparkConnect/DataFrame.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ public actor DataFrame: Sendable {
151151

152152
/// Execute the plan and try to fill `schema` and `batches`.
153153
private func execute() async throws {
154+
// Clear all existing batches.
155+
self.batches.removeAll()
156+
154157
try await withGRPCClient(
155158
transport: .http2NIOPosix(
156159
target: .dns(host: spark.client.host, port: spark.client.port),

Tests/SparkConnectTests/DataFrameTests.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,15 @@ struct DataFrameTests {
311311
await spark.stop()
312312
}
313313

314+
@Test
315+
func collectMultiple() async throws {
316+
let spark = try await SparkSession.builder.getOrCreate()
317+
let df = try await spark.range(1)
318+
#expect(try await df.collect().count == 1)
319+
#expect(try await df.collect().count == 1)
320+
await spark.stop()
321+
}
322+
314323
@Test
315324
func head() async throws {
316325
let spark = try await SparkSession.builder.getOrCreate()

0 commit comments

Comments
 (0)