Skip to content

Commit 2011ffd

Browse files
authored
[Spark] Improve the API of the run method of PostCommitHook (delta-io#4314)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Currently, the `PostCommitHook::run` method is taking in a `Seq[Action]`. This means the actions must be materialized in memory before passing to this method, which may cause OOM for commit with a large number of actions. This PR changes the type to be an `Iterator[Action]` instead to allow future optimizations where the Actions are read from file on demand. This change also prompts a change in `GenerateSymlinkManifest` to make sure that the iterator is only used once. Some test refactoring for `DeltaGenerateSymlinkManifestSuite` is also included ## How was this patch tested? Existing unit tests ## Does this PR introduce _any_ user-facing changes? No
1 parent 961c79c commit 2011ffd

File tree

11 files changed

+78
-44
lines changed

11 files changed

+78
-44
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,7 +1454,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
14541454
throw e
14551455
}
14561456

1457-
runPostCommitHooks(version, postCommitSnapshot, actualCommittedActions)
1457+
runPostCommitHooks(version, postCommitSnapshot, actualCommittedActions.toIterator)
14581458

14591459
executionObserver.transactionCommitted()
14601460
Some(version)
@@ -2683,7 +2683,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
26832683
protected def runPostCommitHooks(
26842684
version: Long,
26852685
postCommitSnapshot: Snapshot,
2686-
committedActions: Seq[Action]): Unit = {
2686+
committedActions: Iterator[Action]): Unit = {
26872687
assert(committed, "Can't call post commit hooks before committing")
26882688

26892689
// Keep track of the active txn because hooks may create more txns and overwrite the active one.
@@ -2701,7 +2701,7 @@ trait OptimisticTransactionImpl extends DeltaTransaction
27012701
hook: PostCommitHook,
27022702
version: Long,
27032703
postCommitSnapshot: Snapshot,
2704-
committedActions: Seq[Action]): Unit = {
2704+
committedActions: Iterator[Action]): Unit = {
27052705
try {
27062706
hook.run(spark, this, version, postCommitSnapshot, committedActions)
27072707
} catch {

spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging {
102102
txn: DeltaTransaction,
103103
committedVersion: Long,
104104
postCommitSnapshot: Snapshot,
105-
actions: Seq[Action]): Unit = {
105+
actions: Iterator[Action]): Unit = {
106106
val conf = spark.sessionState.conf
107107
val autoCompactTypeOpt = getAutoCompactType(conf, postCommitSnapshot.metadata)
108108
// Skip Auto Compact if current transaction is not qualified or the table is not qualified

spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ object CheckpointHook extends PostCommitHook {
3030
txn: DeltaTransaction,
3131
committedVersion: Long,
3232
postCommitSnapshot: Snapshot,
33-
committedActions: Seq[Action]): Unit = {
33+
committedActions: Iterator[Action]): Unit = {
3434
if (!txn.needsCheckpoint) return
3535

3636
// Since the postCommitSnapshot isn't guaranteed to match committedVersion, we have to

spark/src/main/scala/org/apache/spark/sql/delta/hooks/ChecksumHook.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object ChecksumHook extends PostCommitHook with DeltaLogging {
4343
txn: DeltaTransaction,
4444
committedVersion: Long,
4545
postCommitSnapshot: Snapshot,
46-
committedActions: Seq[Action]): Unit = {
46+
committedActions: Iterator[Action]): Unit = {
4747
// Only write the checksum if the postCommitSnapshot matches the version that was committed.
4848
if (postCommitSnapshot.version != committedVersion) return
4949
logInfo(

spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.delta.hooks
1919
// scalastyle:off import.ordering.noEmptyLine
2020
import java.net.URI
2121

22+
import scala.collection.mutable
23+
2224
import org.apache.spark.sql.delta._
2325
import org.apache.spark.sql.delta.ClassicColumnConversions._
2426
import org.apache.spark.sql.delta.actions._
@@ -74,7 +76,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
7476
txn: DeltaTransaction,
7577
committedVersion: Long,
7678
postCommitSnapshot: Snapshot,
77-
committedActions: Seq[Action]): Unit = {
79+
committedActions: Iterator[Action]): Unit = {
7880
generateIncrementalManifest(spark, txn, postCommitSnapshot, committedActions)
7981
}
8082

@@ -95,7 +97,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
9597
spark: SparkSession,
9698
txn: DeltaTransaction,
9799
currentSnapshot: Snapshot,
98-
actions: Seq[Action]): Unit = recordManifestGeneration(txn.deltaLog, full = false) {
100+
actions: Iterator[Action]): Unit = recordManifestGeneration(txn.deltaLog, full = false) {
99101

100102
import org.apache.spark.sql.delta.implicits._
101103

@@ -113,12 +115,15 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
113115

114116
// Find all the manifest partitions that need to updated or deleted
115117
val (allFilesInUpdatedPartitions, nowEmptyPartitions) = if (partitionCols.nonEmpty) {
118+
val (addFiles, otherActions) = actions.partition(_.isInstanceOf[AddFile])
119+
val (removeFiles, _) = otherActions.partition(_.isInstanceOf[RemoveFile])
120+
116121
// Get the partitions where files were added
117-
val partitionsOfAddedFiles = actions.collect { case a: AddFile => a.partitionValues }.toSet
122+
val partitionsOfAddedFiles = addFiles.collect { case a: AddFile => a.partitionValues }.toSet
118123

119124
// Get the partitions where files were deleted
120125
val removedFileNames =
121-
spark.createDataset(actions.collect { case r: RemoveFile => r.path }).toDF("path")
126+
spark.createDataset(removeFiles.collect { case r: RemoveFile => r.path }.toSeq).toDF("path")
122127
val partitionValuesOfRemovedFiles =
123128
txn.snapshot.allFiles.join(removedFileNames, "path").select("partitionValues").persist()
124129
try {

spark/src/main/scala/org/apache/spark/sql/delta/hooks/HudiConverterHook.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object HudiConverterHook extends PostCommitHook with DeltaLogging {
3434
txn: DeltaTransaction,
3535
committedVersion: Long,
3636
postCommitSnapshot: Snapshot,
37-
committedActions: Seq[Action]): Unit = {
37+
committedActions: Iterator[Action]): Unit = {
3838
// Only convert to Hudi if the snapshot matches the version committed.
3939
// This is to skip converting the same actions multiple times - they'll be written out
4040
// by another commit anyways.

spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ object IcebergConverterHook extends PostCommitHook with DeltaLogging {
3535
txn: DeltaTransaction,
3636
committedVersion: Long,
3737
postCommitSnapshot: Snapshot,
38-
committedActions: Seq[Action]): Unit = {
38+
committedActions: Iterator[Action]): Unit = {
3939
// Only convert to Iceberg if the snapshot matches the version committed.
4040
// This is to skip converting the same actions multiple times - they'll be written out
4141
// by another commit anyways.

spark/src/main/scala/org/apache/spark/sql/delta/hooks/PostCommitHook.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ trait PostCommitHook {
4646
txn: DeltaTransaction,
4747
committedVersion: Long,
4848
postCommitSnapshot: Snapshot,
49-
committedActions: Seq[Action]): Unit
49+
committedActions: Iterator[Action]): Unit
5050

5151
/**
5252
* Handle any error caused while running the hook. By default, all errors are ignored as

spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ trait UpdateCatalogBase extends PostCommitHook with DeltaLogging {
6767
txn: DeltaTransaction,
6868
committedVersion: Long,
6969
postCommitSnapshot: Snapshot,
70-
actions: Seq[Action]): Unit = {
70+
actions: Iterator[Action]): Unit = {
7171
// There's a potential race condition here, where a newer commit has already triggered
7272
// this to run. That's fine.
7373
executeOnWrite(spark, postCommitSnapshot)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ trait DeltaErrorsSuiteBase
923923
override val name: String = "DummyPostCommitHook"
924924
override def run(
925925
spark: SparkSession, txn: DeltaTransaction, committedVersion: Long,
926-
postCommitSnapshot: Snapshot, committedActions: Seq[Action]): Unit = {}
926+
postCommitSnapshot: Snapshot, committedActions: Iterator[Action]): Unit = {}
927927
}, 0, "msg", null)
928928
}
929929
checkErrorMessage(e, Some("DELTA_POST_COMMIT_HOOK_FAILED"), Some("2DKD0"),
@@ -936,7 +936,7 @@ trait DeltaErrorsSuiteBase
936936
override val name: String = "DummyPostCommitHook"
937937
override def run(
938938
spark: SparkSession, txn: DeltaTransaction, committedVersion: Long,
939-
postCommitSnapshot: Snapshot, committedActions: Seq[Action]): Unit = {}
939+
postCommitSnapshot: Snapshot, committedActions: Iterator[Action]): Unit = {}
940940
}, 0, null, null)
941941
}
942942
checkErrorMessage(e, Some("DELTA_POST_COMMIT_HOOK_FAILED"), Some("2DKD0"),

0 commit comments

Comments
 (0)