Skip to content

Commit 85740df

Browse files
authored
[Spark] Fix MinorCompaction not including RemoveFile (delta-io#4894)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Currently, if the RemoveFile in a table did not have the `deletionTimestamp` field for whatever reason, these actions would not be included in the compaction file, causing state constructed from the compaction file to include the removed files. This is because we default the `deletionTimestamp` to 0 when this happens, and the log replay only keeps the `RemoveFile` with `deletionTimestamp` larger than 0. In PR fixes this issue by changing the type of `minFileRetentionTimestamp` in `InMemoryLogReplay` to an `Option[Long]`, so that we can specify `None` to include all `RemoveFiles`, regardless of whether they have a `deletionTimestamp` or not. The main call site change is in `MinorCompactionHook`, all other call sites can safely change from 0 to None since they don't care about tombstones. ## How was this patch tested? New unit test ## Does this PR introduce _any_ user-facing changes? No
1 parent 9e183e6 commit 85740df

File tree

4 files changed

+80
-26
lines changed

4 files changed

+80
-26
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,7 +518,7 @@ trait RecordChecksum extends DeltaLogging {
518518
// `minSetTransactionRetentionTimestamp` is set. So passing this as None here explicitly.
519519
// We can also ignore file retention because that only affects [[RemoveFile]] actions.
520520
val logReplay = new InMemoryLogReplay(
521-
minFileRetentionTimestamp = 0,
521+
minFileRetentionTimestamp = None,
522522
minSetTransactionRetentionTimestamp = None)
523523

524524
logReplay.append(attemptVersion - 1, oldSetTransactions.toIterator)
@@ -547,7 +547,7 @@ trait RecordChecksum extends DeltaLogging {
547547

548548
// We only work with DomainMetadata, so RemoveFile and SetTransaction retention don't matter.
549549
val logReplay = new InMemoryLogReplay(
550-
minFileRetentionTimestamp = 0,
550+
minFileRetentionTimestamp = None,
551551
minSetTransactionRetentionTimestamp = None)
552552

553553
val threshold = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_DOMAIN_METADATAS_IN_CRC)
@@ -612,7 +612,7 @@ trait RecordChecksum extends DeltaLogging {
612612

613613
// We only work with AddFile, so RemoveFile and SetTransaction retention don't matter.
614614
val logReplay = new InMemoryLogReplay(
615-
minFileRetentionTimestamp = 0,
615+
minFileRetentionTimestamp = None,
616616
minSetTransactionRetentionTimestamp = None)
617617

618618
logReplay.append(attemptVersion - 1, oldAllFiles.map(normalizePath).toIterator)

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ class Snapshot(
511511
.mapPartitions { iter =>
512512
val state: LogReplay =
513513
new InMemoryLogReplay(
514-
localMinFileRetentionTimestamp,
514+
Some(localMinFileRetentionTimestamp),
515515
localMinSetTransactionRetentionTimestamp)
516516
state.append(0, iter.map(_.unwrap))
517517
state.checkpoint.map(_.wrap)

spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import java.net.URI
2424
* of the table. The protocol for resolution is as follows:
2525
* - The most recent [[AddFile]] and accompanying metadata for any `(path, dv id)` tuple wins.
2626
* - [[RemoveFile]] deletes a corresponding [[AddFile]] and is retained as a
27-
* tombstone until `minFileRetentionTimestamp` has passed.
27+
* tombstone until `minFileRetentionTimestamp` has passed. If `minFileRetentionTimestamp` is
28+
* None, all [[RemoveFile]] actions are retained.
2829
* A [[RemoveFile]] "corresponds" to the [[AddFile]] that matches both the parquet file URI
2930
* *and* the deletion vector's URI (if any).
3031
* - The most recent version for any `appId` in a [[SetTransaction]] wins.
@@ -36,7 +37,7 @@ import java.net.URI
3637
* This class is not thread safe.
3738
*/
3839
class InMemoryLogReplay(
39-
minFileRetentionTimestamp: Long,
40+
minFileRetentionTimestamp: Option[Long],
4041
minSetTransactionRetentionTimestamp: Option[Long]) extends LogReplay {
4142

4243
import InMemoryLogReplay._
@@ -90,18 +91,19 @@ class InMemoryLogReplay(
9091
}
9192

9293
private def getTombstones: Iterable[FileAction] = {
93-
(cancelledRemoveFiles.values ++ activeRemoveFiles.values)
94-
.filter(_.delTimestamp > minFileRetentionTimestamp)
95-
.map(_.copy(dataChange = false))
94+
val allRemovedFiles = cancelledRemoveFiles.values ++ activeRemoveFiles.values
95+
val filteredRemovedFiles = minFileRetentionTimestamp match {
96+
case None => allRemovedFiles
97+
case Some(timestamp) => allRemovedFiles.filter(_.delTimestamp > timestamp)
98+
}
99+
filteredRemovedFiles.map(_.copy(dataChange = false))
96100
}
97101

98102
private[delta] def getTransactions: Iterable[SetTransaction] = {
99-
if (minSetTransactionRetentionTimestamp.isEmpty) {
100-
transactions.values
101-
} else {
102-
transactions.values.filter { txn =>
103-
txn.lastUpdated.exists(_ > minSetTransactionRetentionTimestamp.get)
104-
}
103+
minSetTransactionRetentionTimestamp match {
104+
case None => transactions.values
105+
case Some(timestamp) =>
106+
transactions.values.filter { txn => txn.lastUpdated.exists(_ > timestamp) }
105107
}
106108
}
107109

spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
2323
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2424
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
2525
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
26-
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
26+
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, JsonUtils}
2727
import org.apache.hadoop.fs.Path
2828

2929
import org.apache.spark.SparkConf
@@ -44,14 +44,8 @@ class DeltaLogMinorCompactionSuite extends QueryTest
4444
startVersion: Long,
4545
endVersion: Long): Unit = {
4646
val deltaLog = DeltaLog.forTable(spark, tablePath)
47-
CatalogOwnedTableUtils.populateTableCommitCoordinatorFromCatalog(
48-
spark,
49-
catalogTableOpt = None,
50-
snapshot = deltaLog.unsafeVolatileSnapshot).foreach { tcc =>
51-
tcc.backfillToVersion(endVersion)
52-
}
5347
val logReplay = new InMemoryLogReplay(
54-
minFileRetentionTimestamp = 0,
48+
minFileRetentionTimestamp = None,
5549
minSetTransactionRetentionTimestamp = None)
5650
val hadoopConf = deltaLog.newDeltaHadoopConf()
5751

@@ -187,7 +181,8 @@ class DeltaLogMinorCompactionSuite extends QueryTest
187181
def testSnapshotCreation(
188182
compactionWindows: Seq[(Long, Long)],
189183
checkpoints: Set[Int] = Set.empty,
190-
postSetupFunc: Option[(DeltaLog => Unit)] = None,
184+
postDataGenerationFunc: Option[DeltaLog => Unit] = None,
185+
postSetupFunc: Option[DeltaLog => Unit] = None,
191186
expectedCompactedDeltas: Seq[CompactedDelta],
192187
expectedDeltas: Seq[Long],
193188
expectedCheckpoint: Long = -1L,
@@ -206,7 +201,19 @@ class DeltaLogMinorCompactionSuite extends QueryTest
206201
withTempDir { tmpDir =>
207202
val tableDir = tmpDir.getAbsolutePath
208203
generateData(tableDir, checkpoints)
209-
val deltaLog = DeltaLog.forTable(spark, tableDir)
204+
205+
val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableDir)
206+
// Ensure all commits are backfilled after data generation.
207+
CatalogOwnedTableUtils.populateTableCommitCoordinatorFromCatalog(
208+
spark,
209+
catalogTableOpt = None,
210+
snapshot = snapshot).foreach { tcc =>
211+
tcc.backfillToVersion(snapshot.version)
212+
}
213+
214+
// Data generation complete - run post data generation function
215+
postDataGenerationFunc.foreach(_.apply(deltaLog))
216+
210217
compactionWindows.foreach { case (startV, endV) =>
211218
minorCompactDeltaLog(tableDir, startV, endV)
212219
}
@@ -421,7 +428,6 @@ class DeltaLogMinorCompactionSuite extends QueryTest
421428
)
422429
}
423430

424-
425431
test("compacted deltas should not be used when there are holes in deltas") {
426432
testSnapshotCreation(
427433
compactionWindows = Seq((0, 2), (3, 5), (3, 6)),
@@ -443,6 +449,52 @@ class DeltaLogMinorCompactionSuite extends QueryTest
443449
Seq(DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT.defaultTablePropertyKey -> "false")
444450
)
445451
}
452+
453+
test("compacted deltas should include RemoveFiles that do not have deletionTimestamp") {
454+
testSnapshotCreation(
455+
compactionWindows = Seq((1, 3), (4, 6), (7, 10)),
456+
checkpoints = Set.empty,
457+
postDataGenerationFunc = Some(
458+
(deltaLog: DeltaLog) => {
459+
val hadoopConf = deltaLog.newDeltaHadoopConf()
460+
// Remove deletionTimestamp from RemoveFile in versions 1 to 10.
461+
(1 to 10).foreach { versionToRead =>
462+
val file = FileNames.unsafeDeltaFile(deltaLog.logPath, versionToRead)
463+
val actions = deltaLog.store.readAsIterator(file, hadoopConf).map(Action.fromJson)
464+
val actionsWithoutDeletionTimestamp = actions
465+
.map {
466+
case r: RemoveFile if r.deletionTimestamp.isDefined =>
467+
r.copy(deletionTimestamp = None)
468+
case ci: CommitInfo =>
469+
// The operationParameters were serialized with JsonMapSerializer but there
470+
// were no corresponding JsonMapDeserializer when reading them back,
471+
// so need to do this step to ensure they can be serialized again.
472+
val parameters = Option(ci.operationParameters)
473+
.map(_.mapValues(JsonUtils.toJson(_)).toMap)
474+
.orNull
475+
ci.copy(operationParameters = parameters)
476+
case other => other
477+
}
478+
.map(_.json)
479+
.toSeq
480+
// The iterator is already consumed above so that we don't run into the issue of
481+
// overwriting the file while reading it.
482+
deltaLog.store.write(
483+
path = file,
484+
actions = actionsWithoutDeletionTimestamp.toIterator,
485+
overwrite = true,
486+
hadoopConf = hadoopConf)
487+
}
488+
}
489+
),
490+
expectedCompactedDeltas = Seq(
491+
CompactedDelta((1, 3), numAdds = 1, numRemoves = 1, numMetadata = 0),
492+
CompactedDelta((4, 6), numAdds = 11, numRemoves = 5, numMetadata = 0),
493+
CompactedDelta((7, 10), numAdds = 8, numRemoves = 6, numMetadata = 1)
494+
),
495+
expectedDeltas = Seq(0)
496+
)
497+
}
446498
}
447499

448500
class DeltaLogMinorCompactionWithCatalogOwnedBatch1Suite

0 commit comments

Comments
 (0)