Skip to content

Commit de2bcd7

Browse files
Read metadata and protocol information from Delta checksum files (when configured and available).
Compliant Delta writers may emit optional checksum files alongside commits containing metadata and protocol information. Instead of loading the latest checkpoint and replaying intervening commits (which can be expensive, especially for large v1 checkpoints), Trino can read the latest commit’s checksum file to obtain this information with a single listing and small JSON read. Ref. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file If the checksum file is missing or does not contain both metadata and protocol, we fall back to the existing Delta log scanning approach. Behavior is gated by session property load_metadata_from_checksum_file (defaulting to config delta.load_metadata_from_checksum_file, which defaults to true). Internal testing reduced analysis time for large v1-checkpoint tables from ~10s to <500ms.
1 parent 85b609e commit de2bcd7

File tree

14 files changed

+942
-57
lines changed

14 files changed

+942
-57
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public class DeltaLakeConfig
9595
private boolean deltaLogFileSystemCacheDisabled;
9696
private int metadataParallelism = 8;
9797
private int checkpointProcessingParallelism = 4;
98+
private boolean loadMetadataFromChecksumFile = true;
9899

99100
public Duration getMetadataCacheTtl()
100101
{
@@ -587,4 +588,17 @@ public DeltaLakeConfig setCheckpointProcessingParallelism(int checkpointProcessi
587588
this.checkpointProcessingParallelism = checkpointProcessingParallelism;
588589
return this;
589590
}
591+
592+
public boolean isLoadMetadataFromChecksumFile()
593+
{
594+
return loadMetadataFromChecksumFile;
595+
}
596+
597+
@Config("delta.load_metadata_from_checksum_file")
598+
@ConfigDescription("Use checksum metadata file (if available) for metadata and protocol entry retrieval, rather than scanning the log")
599+
public DeltaLakeConfig setLoadMetadataFromChecksumFile(boolean loadMetadataFromChecksumFile)
600+
{
601+
this.loadMetadataFromChecksumFile = loadMetadataFromChecksumFile;
602+
return this;
603+
}
590604
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 119 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
7070
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
7171
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
72+
import io.trino.plugin.deltalake.transactionlog.DeltaLakeVersionChecksum;
7273
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
7374
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
7475
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
@@ -162,6 +163,7 @@
162163
import io.trino.spi.type.VarcharType;
163164

164165
import java.io.IOException;
166+
import java.io.UncheckedIOException;
165167
import java.net.URI;
166168
import java.net.URISyntaxException;
167169
import java.time.Duration;
@@ -241,6 +243,7 @@
241243
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
242244
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite;
243245
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
246+
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLoadMetadataFromChecksumFile;
244247
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
245248
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
246249
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled;
@@ -298,8 +301,10 @@
298301
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
299302
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
300303
import static io.trino.plugin.deltalake.transactionlog.TemporalTimeTravelUtil.findLatestVersionUsingTemporal;
304+
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getLatestCommitVersion;
301305
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
302306
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint;
307+
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readVersionChecksumFile;
303308
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
304309
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
305310
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
@@ -480,6 +485,15 @@ private record QueriedTable(SchemaTableName schemaTableName, long version)
480485
}
481486
}
482487

488+
private record MetadataAndProtocolAndVersion(long version, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
489+
{
490+
MetadataAndProtocolAndVersion
491+
{
492+
requireNonNull(metadataEntry, "metadataEntry is null");
493+
requireNonNull(protocolEntry, "protocolEntry is null");
494+
}
495+
}
496+
483497
public DeltaLakeMetadata(
484498
DeltaLakeMetastore metastore,
485499
TransactionLogAccess transactionLogAccess,
@@ -717,27 +731,39 @@ public LocatedTableHandle getTableHandle(
717731

718732
String tableLocation = table.location();
719733
TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
720-
TableSnapshot tableSnapshot = getSnapshot(session, table, endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor)));
734+
Optional<Long> endTableVersion = endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor));
721735

722-
MetadataAndProtocolEntries logEntries;
723-
try {
724-
logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
725-
}
726-
catch (TrinoException e) {
727-
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
728-
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, e);
736+
Optional<MetadataAndProtocolAndVersion> metadataAndProtocol = Optional.empty();
737+
if (isLoadMetadataFromChecksumFile(session)) {
738+
Optional<Long> startTableVersion = startVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor));
739+
740+
try {
741+
metadataAndProtocol = loadMetadataAndProtocolFromChecksum(tableName, fileSystem, tableLocation, startTableVersion, endTableVersion);
742+
}
743+
catch (TrinoException e) {
744+
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
745+
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, e);
746+
}
747+
throw e;
729748
}
730-
throw e;
731-
}
732-
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
733-
if (metadataEntry == null) {
734-
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
735749
}
736750

737-
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
738-
if (protocolEntry == null) {
739-
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
751+
if (metadataAndProtocol.isEmpty()) {
752+
try {
753+
metadataAndProtocol = Optional.of(loadMetadataAndProtocolFromTransactionLog(session, table, fileSystem, endTableVersion));
754+
}
755+
catch (TrinoException e) {
756+
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
757+
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, e);
758+
}
759+
throw e;
760+
}
740761
}
762+
MetadataAndProtocolAndVersion tableState = metadataAndProtocol.orElseThrow();
763+
MetadataEntry metadataEntry = tableState.metadataEntry();
764+
ProtocolEntry protocolEntry = tableState.protocolEntry();
765+
long snapshotVersion = tableState.version();
766+
741767
if (protocolEntry.minReaderVersion() > MAX_READER_VERSION) {
742768
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion());
743769
return null;
@@ -750,8 +776,8 @@ public LocatedTableHandle getTableHandle(
750776
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry, protocolEntry));
751777
if (metadataScheduler.canStoreTableMetadata(session, metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())) &&
752778
endVersion.isEmpty() &&
753-
!isSameTransactionVersion(metastoreTable.get(), tableSnapshot)) {
754-
tableUpdateInfos.put(tableName, new TableUpdateInfo(session, tableSnapshot.getVersion(), metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())));
779+
!isSameTransactionVersion(metastoreTable.get(), snapshotVersion)) {
780+
tableUpdateInfos.put(tableName, new TableUpdateInfo(session, snapshotVersion, metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())));
755781
}
756782
return new DeltaLakeTableHandle(
757783
tableName.getSchemaName(),
@@ -767,10 +793,84 @@ public LocatedTableHandle getTableHandle(
767793
Optional.empty(),
768794
Optional.empty(),
769795
Optional.empty(),
770-
tableSnapshot.getVersion(),
796+
snapshotVersion,
771797
endVersion.isPresent());
772798
}
773799

800+
private Optional<MetadataAndProtocolAndVersion> loadMetadataAndProtocolFromChecksum(
801+
SchemaTableName tableName,
802+
TrinoFileSystem fileSystem,
803+
String tableLocation,
804+
Optional<Long> startTableVersion,
805+
Optional<Long> endTableVersion)
806+
{
807+
// If we can't fail to identify the latest commit in the specified range (either due to an IO-related error or a
808+
// total lack of commits), treat this as a hard failure. Falling back to scanning the log is unlikely to help
809+
810+
OptionalLong latestCommitVersion;
811+
812+
if (endTableVersion.isPresent()) {
813+
// Optimization: we already validated the existence of endTableVersion in getVersion, so endTableVersion is
814+
// definitionally the latest eligible commit
815+
latestCommitVersion = OptionalLong.of(endTableVersion.get());
816+
}
817+
else {
818+
try {
819+
latestCommitVersion = getLatestCommitVersion(fileSystem, tableLocation, startTableVersion, endTableVersion);
820+
}
821+
catch (IOException | UncheckedIOException e) {
822+
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to determine latest commit version for " + tableName, e);
823+
}
824+
}
825+
826+
if (latestCommitVersion.isEmpty()) {
827+
String startTableVersionMemo = startTableVersion.map(Object::toString).orElse("earliest");
828+
String endTableVersionMemo = endTableVersion.map(Object::toString).orElse("latest");
829+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, format("Delta table %s has no commits between %s and %s", tableName, startTableVersionMemo, endTableVersionMemo));
830+
}
831+
832+
// Valid Delta tables needn't have checksum files, and valid checksum files needn't have metadata and protocol --
833+
// in this case, we should fall back gracefully to scanning the Delta log
834+
835+
long latestChecksumVersion = latestCommitVersion.getAsLong();
836+
Optional<DeltaLakeVersionChecksum> versionChecksum = readVersionChecksumFile(fileSystem, tableLocation, latestChecksumVersion);
837+
if (versionChecksum.isEmpty()) {
838+
return Optional.empty();
839+
}
840+
841+
DeltaLakeVersionChecksum checksum = versionChecksum.get();
842+
843+
MetadataEntry metadataEntry = checksum.getMetadata();
844+
ProtocolEntry protocolEntry = checksum.getProtocol();
845+
if (metadataEntry == null || protocolEntry == null) {
846+
return Optional.empty();
847+
}
848+
849+
return Optional.of(new MetadataAndProtocolAndVersion(latestChecksumVersion, metadataEntry, protocolEntry));
850+
}
851+
852+
private MetadataAndProtocolAndVersion loadMetadataAndProtocolFromTransactionLog(
853+
ConnectorSession session,
854+
DeltaMetastoreTable table,
855+
TrinoFileSystem fileSystem,
856+
Optional<Long> endTableVersion)
857+
{
858+
TableSnapshot tableSnapshot = getSnapshot(session, table, endTableVersion);
859+
MetadataAndProtocolEntries logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
860+
861+
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
862+
if (metadataEntry == null) {
863+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable());
864+
}
865+
866+
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
867+
if (protocolEntry == null) {
868+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable());
869+
}
870+
871+
return new MetadataAndProtocolAndVersion(tableSnapshot.getVersion(), metadataEntry, protocolEntry);
872+
}
873+
774874
@Override
775875
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
776876
{

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public final class DeltaLakeSessionProperties
7575
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
7676
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
7777
private static final String STORE_TABLE_METADATA = "store_table_metadata";
78+
private static final String LOAD_METADATA_FROM_CHECKSUM_FILE = "load_metadata_from_checksum_file";
7879

7980
private final List<PropertyMetadata<?>> sessionProperties;
8081

@@ -226,6 +227,11 @@ public DeltaLakeSessionProperties(
226227
"Require filter on partition column",
227228
deltaLakeConfig.isQueryPartitionFilterRequired(),
228229
false),
230+
booleanProperty(
231+
LOAD_METADATA_FROM_CHECKSUM_FILE,
232+
"Use checksum metadata file for metadata and protocol entry retrieval",
233+
deltaLakeConfig.isLoadMetadataFromChecksumFile(),
234+
false),
229235
booleanProperty(
230236
STORE_TABLE_METADATA,
231237
"Store table metadata in metastore",
@@ -344,6 +350,11 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
344350
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
345351
}
346352

353+
public static boolean isLoadMetadataFromChecksumFile(ConnectorSession session)
354+
{
355+
return session.getProperty(LOAD_METADATA_FROM_CHECKSUM_FILE, Boolean.class);
356+
}
357+
347358
public static boolean isStoreTableMetadataInMetastoreEnabled(ConnectorSession session)
348359
{
349360
return session.getProperty(STORE_TABLE_METADATA, Boolean.class);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeTableMetadataScheduler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,14 @@ public void stop()
213213
}
214214

215215
public static boolean isSameTransactionVersion(Table table, TableSnapshot snapshot)
216+
{
217+
return isSameTransactionVersion(table, snapshot.getVersion());
218+
}
219+
220+
public static boolean isSameTransactionVersion(Table table, long snapshotVersion)
216221
{
217222
return getLastTransactionVersion(table)
218-
.map(version -> version == snapshot.getVersion())
223+
.map(version -> version == snapshotVersion)
219224
.orElse(false);
220225
}
221226

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.deltalake.transactionlog;
15+
16+
import com.fasterxml.jackson.annotation.JsonCreator;
17+
import com.fasterxml.jackson.annotation.JsonProperty;
18+
import jakarta.annotation.Nullable;
19+
20+
import java.util.Objects;
21+
22+
import static com.google.common.base.MoreObjects.toStringHelper;
23+
24+
// Ref. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file
25+
public class DeltaLakeVersionChecksum
26+
{
27+
private final MetadataEntry metadata;
28+
private final ProtocolEntry protocol;
29+
30+
@JsonCreator
31+
public DeltaLakeVersionChecksum(
32+
@JsonProperty("metadata") @Nullable MetadataEntry metadata,
33+
@JsonProperty("protocol") @Nullable ProtocolEntry protocol)
34+
{
35+
this.metadata = metadata;
36+
this.protocol = protocol;
37+
}
38+
39+
@Nullable
40+
@JsonProperty
41+
public MetadataEntry getMetadata()
42+
{
43+
return metadata;
44+
}
45+
46+
@Nullable
47+
@JsonProperty
48+
public ProtocolEntry getProtocol()
49+
{
50+
return protocol;
51+
}
52+
53+
@Override
54+
public boolean equals(Object o)
55+
{
56+
if (this == o) {
57+
return true;
58+
}
59+
if (o == null || getClass() != o.getClass()) {
60+
return false;
61+
}
62+
DeltaLakeVersionChecksum that = (DeltaLakeVersionChecksum) o;
63+
return Objects.equals(metadata, that.metadata) &&
64+
Objects.equals(protocol, that.protocol);
65+
}
66+
67+
@Override
68+
public int hashCode()
69+
{
70+
return Objects.hash(metadata, protocol);
71+
}
72+
73+
@Override
74+
public String toString()
75+
{
76+
return toStringHelper(this)
77+
.add("metadata", metadata)
78+
.add("protocol", protocol)
79+
.toString();
80+
}
81+
}

0 commit comments

Comments
 (0)