Skip to content

Commit 010a44c

Browse files
authored
[Delta] Extend signature for checkpoint function (delta-io#3830)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> ## Description A minor refactor for checkpoint function * Change the signature from `checkpoint(snapshotToCheckpoint, tableIdentifierOpt: Option[TableIdentifier] = None)` to `checkpoint(snapshotToCheckpoint, catalogTable: Option[CatalogTable] = None)` ## How was this patch tested? Existing UTs ## Does this PR introduce _any_ user-facing changes? No
1 parent 959765a commit 010a44c

File tree

2 files changed

+4
-2
lines changed

2 files changed

+4
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.spark.paths.SparkPath
4343
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
4444
import org.apache.spark.sql.catalyst.TableIdentifier
4545
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
46+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
4647
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
4748
import org.apache.spark.sql.execution.SQLExecution
4849
import org.apache.spark.sql.execution.datasources.FileFormat
@@ -300,12 +301,13 @@ trait Checkpoints extends DeltaLogging {
300301
*/
301302
def checkpoint(
302303
snapshotToCheckpoint: Snapshot,
303-
tableIdentifierOpt: Option[TableIdentifier] = None): Unit =
304+
catalogTable: Option[CatalogTable] = None): Unit =
304305
recordDeltaOperation(this, "delta.checkpoint") {
305306
withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") {
306307
if (snapshotToCheckpoint.version < 0) {
307308
throw DeltaErrors.checkpointNonExistTable(dataPath)
308309
}
310+
val tableIdentifierOpt = catalogTable.map(_.identifier)
309311
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt)
310312
}
311313
}

spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ object CheckpointHook extends PostCommitHook {
4141
lastCheckpointHint = None,
4242
lastCheckpointProvider = Some(cp),
4343
tableIdentifierOpt = txn.catalogTable.map(_.identifier))
44-
txn.deltaLog.checkpoint(snapshotToCheckpoint, txn.catalogTable.map(_.identifier))
44+
txn.deltaLog.checkpoint(snapshotToCheckpoint, txn.catalogTable)
4545
}
4646
}

0 commit comments

Comments
 (0)