Skip to content

Commit 8525490

Browse files
committed
add checksum complex
1 parent bbd9ea3 commit 8525490

File tree

9 files changed

+135
-7
lines changed

9 files changed

+135
-7
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/Table.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,16 @@ TransactionBuilder createTransactionBuilder(
143143
*/
144144
void checkpoint(Engine engine, long version)
145145
throws TableNotFoundException, CheckpointAlreadyExistsException, IOException;
146+
147+
/**
148+
* Checkpoint the table at given version. It writes a single checkpoint file.
149+
*
150+
* @param engine {@link Engine} instance to use.
151+
* @param version Version to checkpoint.
152+
* @throws TableNotFoundException if the table is not found
153+
* @throws CheckpointAlreadyExistsException if a checkpoint already exists at the given version
154+
* @throws IOException for any I/O error.
155+
* @since 3.2.0
156+
*/
157+
void checksum(Engine engine, long version) throws TableNotFoundException, IOException;
146158
}

kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ enum PostCommitHookType {
4040
* transaction commits. This operation has a minimal latency with no requirement of reading
4141
* previous checkpoint or logs.
4242
*/
43-
CHECKSUM_SIMPLE
43+
CHECKSUM_SIMPLE,
44+
CHECKSUM_FULL
4445
}
4546

4647
/** Invokes the post commit operation whose implementation must be thread safe. */

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ public void checkpoint(Engine engine, long version)
134134
snapshotManager.checkpoint(engine, clock, version);
135135
}
136136

137+
@Override
138+
public void checksum(Engine engine, long version)
139+
throws TableNotFoundException, CheckpointAlreadyExistsException, IOException {
140+
snapshotManager.checksum(engine, version);
141+
}
142+
137143
@Override
138144
public TransactionBuilder createTransactionBuilder(
139145
Engine engine, String engineInfo, Operation operation) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.delta.kernel.internal.data.TransactionStateRow;
3434
import io.delta.kernel.internal.fs.Path;
3535
import io.delta.kernel.internal.hook.CheckpointHook;
36+
import io.delta.kernel.internal.hook.ChecksumFullHook;
3637
import io.delta.kernel.internal.hook.ChecksumSimpleHook;
3738
import io.delta.kernel.internal.metrics.TransactionMetrics;
3839
import io.delta.kernel.internal.metrics.TransactionReportImpl;
@@ -364,8 +365,14 @@ private TransactionCommitResult doCommit(
364365
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
365366
}
366367

367-
buildPostCommitCrcInfo(commitAsVersion, transactionMetrics.captureTransactionMetricsResult())
368-
.ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath)));
368+
Optional<CRCInfo> postCommitCrcInfo =
369+
buildPostCommitCrcInfo(
370+
commitAsVersion, transactionMetrics.captureTransactionMetricsResult());
371+
if (postCommitCrcInfo.isPresent()) {
372+
postCommitHooks.add(new ChecksumSimpleHook(postCommitCrcInfo.get(), logPath));
373+
} else {
374+
postCommitHooks.add(new ChecksumFullHook(dataPath, commitAsVersion));
375+
}
369376

370377
return new TransactionCommitResult(commitAsVersion, postCommitHooks);
371378
} catch (FileAlreadyExistsException e) {

kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumWriter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ public void writeCheckSum(Engine engine, CRCInfo crcInfo) throws IOException {
5151
engine
5252
.getJsonHandler()
5353
.writeJsonFileAtomically(
54-
newChecksumPath.toString(),
55-
singletonCloseableIterator(toRow(crcInfo)),
56-
false /* overwrite */);
54+
newChecksumPath.toString(), singletonCloseableIterator(toRow(crcInfo)), false);
5755
logger.info("Write checksum file `{}` succeeds", newChecksumPath);
5856
return null;
5957
},
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.hook;
17+
18+
import io.delta.kernel.Table;
19+
import io.delta.kernel.engine.Engine;
20+
import io.delta.kernel.hook.PostCommitHook;
21+
import io.delta.kernel.internal.fs.Path;
22+
import java.io.IOException;
23+
24+
public class ChecksumFullHook implements PostCommitHook {
25+
26+
private final Path tablePath;
27+
private final long checksumVersion;
28+
29+
public ChecksumFullHook(Path tablePath, long checksumVersion) {
30+
this.tablePath = tablePath;
31+
this.checksumVersion = checksumVersion;
32+
}
33+
34+
@Override
35+
public void threadSafeInvoke(Engine engine) throws IOException {
36+
Table.forPath(engine, tablePath.toString()).checksum(engine, checksumVersion);
37+
}
38+
39+
@Override
40+
public PostCommitHookType getType() {
41+
return PostCommitHookType.CHECKPOINT;
42+
}
43+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED;
2121
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;
2222
import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable;
23+
import static io.delta.kernel.internal.actions.SingleAction.CHECKPOINT_SCHEMA;
2324
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
2425
import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs;
2526
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
@@ -31,9 +32,12 @@
3132
import io.delta.kernel.exceptions.InvalidTableException;
3233
import io.delta.kernel.exceptions.TableNotFoundException;
3334
import io.delta.kernel.internal.*;
35+
import io.delta.kernel.internal.actions.AddFile;
3436
import io.delta.kernel.internal.actions.Metadata;
3537
import io.delta.kernel.internal.annotation.VisibleForTesting;
3638
import io.delta.kernel.internal.checkpoints.*;
39+
import io.delta.kernel.internal.checksum.CRCInfo;
40+
import io.delta.kernel.internal.checksum.ChecksumWriter;
3741
import io.delta.kernel.internal.fs.Path;
3842
import io.delta.kernel.internal.lang.ListUtils;
3943
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
@@ -48,6 +52,7 @@
4852
import java.nio.file.FileAlreadyExistsException;
4953
import java.util.*;
5054
import java.util.concurrent.atomic.AtomicReference;
55+
import java.util.concurrent.atomic.LongAdder;
5156
import java.util.stream.Collectors;
5257
import org.slf4j.Logger;
5358
import org.slf4j.LoggerFactory;
@@ -207,6 +212,48 @@ public void checkpoint(Engine engine, Clock clock, long version)
207212
}
208213
}
209214

215+
public void checksum(Engine engine, long version) throws IOException {
216+
ChecksumWriter checksumWriter = new ChecksumWriter(logPath);
217+
SnapshotImpl snapshot =
218+
(SnapshotImpl)
219+
getSnapshotAt(
220+
engine,
221+
version,
222+
SnapshotQueryContext.forVersionSnapshot(tablePath.toString(), version));
223+
if (snapshot.getCurrentCrcInfo().isPresent()) {
224+
checksumWriter.writeCheckSum(engine, snapshot.getCurrentCrcInfo().get());
225+
return;
226+
}
227+
228+
// TODO: Optimize using crc after https://github.com/delta-io/delta/pull/4112
229+
LongAdder tableSizeByte = new LongAdder();
230+
LongAdder fileCount = new LongAdder();
231+
CreateCheckpointIterator currentTableStateIterator =
232+
snapshot.getCreateCheckpointIterator(engine);
233+
currentTableStateIterator.forEachRemaining(
234+
batch ->
235+
batch
236+
.getRows()
237+
.forEachRemaining(
238+
// TODO: Collect domain metadata, ict etc.
239+
row -> {
240+
if (!row.isNullAt(CHECKPOINT_SCHEMA.indexOf("add"))) {
241+
tableSizeByte.add(
242+
new AddFile(row.getStruct(CHECKPOINT_SCHEMA.indexOf("add"))).getSize());
243+
fileCount.add(1);
244+
}
245+
}));
246+
checksumWriter.writeCheckSum(
247+
engine,
248+
new CRCInfo(
249+
version,
250+
snapshot.getMetadata(),
251+
snapshot.getProtocol(),
252+
tableSizeByte.longValue(),
253+
fileCount.longValue(),
254+
Optional.empty()));
255+
}
256+
210257
////////////////////
211258
// Helper Methods //
212259
////////////////////

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumWriterSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils {
5959

6060
checksumWriter.writeCheckSum(
6161
mockEngine(jsonHandler = jsonHandler),
62-
new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txn)
62+
new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txn),
63+
false
6364
)
6465

6566
verifyChecksumFile(jsonHandler, version)

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumSimpleComparisonSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ class ChecksumSimpleComparisonSuite extends AnyFunSuite with TestUtils {
115115
val kernelCrc = readCrcInfo(engine, kernelTablePath, version)
116116

117117
assertCrcInfoEquals(sparkCrc, kernelCrc)
118+
var checksumFullCrc = readCrcInfoFromChecksumFull(engine, kernelTablePath, version)
119+
assertCrcInfoEquals(sparkCrc, checksumFullCrc)
118120
}
119121

120122
private def readCrcInfo(engine: Engine, path: String, version: Long): CRCInfo = {
@@ -123,6 +125,17 @@ class ChecksumSimpleComparisonSuite extends AnyFunSuite with TestUtils {
123125
.orElseThrow(() => new IllegalStateException(s"CRC info not found for version $version"))
124126
}
125127

128+
private def readCrcInfoFromChecksumFull(engine: Engine, path: String, version: Long): CRCInfo = {
129+
// assert(Files.deleteIfExists(buildCrcPath(path, version)))
130+
defaultEngine.getFileSystemClient.delete(buildCrcPath(path, version).toString)
131+
Table
132+
.forPath(engine, path)
133+
.checksum(engine, version);
134+
ChecksumReader
135+
.getCRCInfo(engine, new Path(s"$path/_delta_log"), version, version)
136+
.orElseThrow(() => new IllegalStateException(s"CRC info not found for version $version"))
137+
}
138+
126139
private def buildCrcPath(basePath: String, version: Long): java.nio.file.Path = {
127140
new File(f"$basePath/_delta_log/$version%020d.crc").toPath
128141
}

0 commit comments

Comments
 (0)