Skip to content

Commit d1c5262

Browse files
authored
[Spark] Fix double-use iterator bug (delta-io#4367)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description A previous refactoring delta-io#4314 introduced a bug that may cause an iterator to be used more than once. This PR fix this issue. ## How was this patch tested? Existing tests ## Does this PR introduce _any_ user-facing changes? No
1 parent ef2d4a7 commit d1c5262

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,7 +1454,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
14541454
throw e
14551455
}
14561456

1457-
runPostCommitHooks(version, postCommitSnapshot, actualCommittedActions.toIterator)
1457+
runPostCommitHooks(version, postCommitSnapshot, actualCommittedActions)
14581458

14591459
executionObserver.transactionCommitted()
14601460
Some(version)
@@ -2683,15 +2683,16 @@ trait OptimisticTransactionImpl extends DeltaTransaction
26832683
protected def runPostCommitHooks(
26842684
version: Long,
26852685
postCommitSnapshot: Snapshot,
2686-
committedActions: Iterator[Action]): Unit = {
2686+
committedActions: Seq[Action]): Unit = {
26872687
assert(committed, "Can't call post commit hooks before committing")
26882688

26892689
// Keep track of the active txn because hooks may create more txns and overwrite the active one.
26902690
val activeCommit = OptimisticTransaction.getActive()
26912691
OptimisticTransaction.clearActive()
26922692

26932693
try {
2694-
postCommitHooks.foreach(runPostCommitHook(_, version, postCommitSnapshot, committedActions))
2694+
postCommitHooks.foreach(
2695+
runPostCommitHook(_, version, postCommitSnapshot, committedActions.toIterator))
26952696
} finally {
26962697
activeCommit.foreach(OptimisticTransaction.setActive)
26972698
}

0 commit comments

Comments
 (0)