Skip to content

Commit 0032a60

Browse files
adam-richardson-openaierichwangfredliu-data
committed
Read metadata and protocol information from Delta checksum files
Compliant Delta writers may emit optional checksum files alongside commits containing metadata and protocol information. Instead of loading the latest checkpoint and replaying intervening commits (which can be expensive, especially for large v1 checkpoints), Trino can read the latest commit’s checksum file to obtain this information with a single listing and small JSON read. Ref. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file If the checksum file is missing or does not contain both metadata and protocol, we fall back to the existing Delta log scanning approach. Behavior is gated by session property load_metadata_from_checksum_file (defaulting to config delta.load_metadata_from_checksum_file, which defaults to true). Internal testing reduced analysis time for large v1-checkpoint tables from ~10s to <500ms. Co-authored-by: Eric Hwang <eh@openai.com> Co-authored-by: Fred Liu <fredliu@openai.com>
1 parent 85b609e commit 0032a60

File tree

60 files changed

+1176
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1176
-24
lines changed

docs/src/main/sphinx/connector/delta-lake.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,11 @@ values. Typical usage does not require you to configure them.
200200
- Number of threads used for retrieving checkpoint files of each table. Currently, only
201201
retrievals of V2 Checkpoint's sidecar files are parallelized.
202202
- `4`
203+
* - `delta.load-metadata-from-checksum-file`
204+
- Use the Delta checksum metadata file (if available) to retrieve table
205+
metadata and protocol entries instead of scanning the transaction log. The
206+
equivalent catalog session property is `load_metadata_from_checksum_file`.
207+
- `true`
203208
:::
204209

205210
### Catalog session properties
@@ -233,6 +238,10 @@ The following table describes {ref}`catalog session properties
233238
- Read only projected fields from row columns while performing `SELECT`
234239
queries.
235240
- `true`
241+
* - `load_metadata_from_checksum_file`
242+
- Use the Delta checksum metadata file (if available) to retrieve table
243+
metadata and protocol entries instead of scanning the transaction log.
244+
- `true`
236245
:::
237246

238247
(delta-lake-fte-support)=

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public class DeltaLakeConfig
9595
private boolean deltaLogFileSystemCacheDisabled;
9696
private int metadataParallelism = 8;
9797
private int checkpointProcessingParallelism = 4;
98+
private boolean loadMetadataFromChecksumFile = true;
9899

99100
public Duration getMetadataCacheTtl()
100101
{
@@ -587,4 +588,17 @@ public DeltaLakeConfig setCheckpointProcessingParallelism(int checkpointProcessi
587588
this.checkpointProcessingParallelism = checkpointProcessingParallelism;
588589
return this;
589590
}
591+
592+
public boolean isLoadMetadataFromChecksumFile()
593+
{
594+
return loadMetadataFromChecksumFile;
595+
}
596+
597+
@Config("delta.load-metadata-from-checksum-file")
598+
@ConfigDescription("Use checksum metadata file (if available) for metadata and protocol entry retrieval, rather than scanning the log")
599+
public DeltaLakeConfig setLoadMetadataFromChecksumFile(boolean loadMetadataFromChecksumFile)
600+
{
601+
this.loadMetadataFromChecksumFile = loadMetadataFromChecksumFile;
602+
return this;
603+
}
590604
}

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 134 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,15 @@
6969
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
7070
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
7171
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
72+
import io.trino.plugin.deltalake.transactionlog.DeltaLakeVersionChecksum;
7273
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
7374
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
7475
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
7576
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
7677
import io.trino.plugin.deltalake.transactionlog.Transaction;
7778
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
7879
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
80+
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser.CommitVersionChecksumFileInfo;
7981
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
8082
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
8183
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
@@ -162,6 +164,7 @@
162164
import io.trino.spi.type.VarcharType;
163165

164166
import java.io.IOException;
167+
import java.io.UncheckedIOException;
165168
import java.net.URI;
166169
import java.net.URISyntaxException;
167170
import java.time.Duration;
@@ -241,6 +244,7 @@
241244
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
242245
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite;
243246
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
247+
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLoadMetadataFromChecksumFile;
244248
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
245249
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
246250
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled;
@@ -298,8 +302,10 @@
298302
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
299303
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
300304
import static io.trino.plugin.deltalake.transactionlog.TemporalTimeTravelUtil.findLatestVersionUsingTemporal;
305+
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.findLatestCommitVersionChecksumFileInfo;
301306
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
302307
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint;
308+
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readVersionChecksumFile;
303309
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
304310
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
305311
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
@@ -480,6 +486,15 @@ private record QueriedTable(SchemaTableName schemaTableName, long version)
480486
}
481487
}
482488

489+
private record DeltaLakeTableDescriptor(long version, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
490+
{
491+
DeltaLakeTableDescriptor
492+
{
493+
requireNonNull(metadataEntry, "metadataEntry is null");
494+
requireNonNull(protocolEntry, "protocolEntry is null");
495+
}
496+
}
497+
483498
public DeltaLakeMetadata(
484499
DeltaLakeMetastore metastore,
485500
TransactionLogAccess transactionLogAccess,
@@ -717,27 +732,22 @@ public LocatedTableHandle getTableHandle(
717732

718733
String tableLocation = table.location();
719734
TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
720-
TableSnapshot tableSnapshot = getSnapshot(session, table, endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor)));
721735

722-
MetadataAndProtocolEntries logEntries;
736+
DeltaLakeTableDescriptor descriptor;
723737
try {
724-
logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
738+
descriptor = loadDescriptor(session, tableName, table, fileSystem, tableLocation, startVersion, endVersion);
725739
}
726740
catch (TrinoException e) {
727741
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
728-
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, e);
742+
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), table.managed(), tableLocation, e);
729743
}
730744
throw e;
731745
}
732-
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
733-
if (metadataEntry == null) {
734-
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
735-
}
736746

737-
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
738-
if (protocolEntry == null) {
739-
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
740-
}
747+
MetadataEntry metadataEntry = descriptor.metadataEntry();
748+
ProtocolEntry protocolEntry = descriptor.protocolEntry();
749+
long snapshotVersion = descriptor.version();
750+
741751
if (protocolEntry.minReaderVersion() > MAX_READER_VERSION) {
742752
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion());
743753
return null;
@@ -750,8 +760,8 @@ public LocatedTableHandle getTableHandle(
750760
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry, protocolEntry));
751761
if (metadataScheduler.canStoreTableMetadata(session, metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())) &&
752762
endVersion.isEmpty() &&
753-
!isSameTransactionVersion(metastoreTable.get(), tableSnapshot)) {
754-
tableUpdateInfos.put(tableName, new TableUpdateInfo(session, tableSnapshot.getVersion(), metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())));
763+
!isSameTransactionVersion(metastoreTable.get(), snapshotVersion)) {
764+
tableUpdateInfos.put(tableName, new TableUpdateInfo(session, snapshotVersion, metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())));
755765
}
756766
return new DeltaLakeTableHandle(
757767
tableName.getSchemaName(),
@@ -767,10 +777,119 @@ public LocatedTableHandle getTableHandle(
767777
Optional.empty(),
768778
Optional.empty(),
769779
Optional.empty(),
770-
tableSnapshot.getVersion(),
780+
snapshotVersion,
771781
endVersion.isPresent());
772782
}
773783

784+
private DeltaLakeTableDescriptor loadDescriptor(ConnectorSession session, SchemaTableName tableName, DeltaMetastoreTable table, TrinoFileSystem fileSystem, String tableLocation, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion)
785+
{
786+
Optional<Long> endTableVersion = endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor));
787+
788+
// Load descriptor from the latest checksum file, if enabled and available
789+
if (isLoadMetadataFromChecksumFile(session)) {
790+
Optional<Long> startTableVersion = startVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor));
791+
792+
Optional<DeltaLakeTableDescriptor> descriptor = loadDescriptorFromChecksum(tableName, fileSystem, tableLocation, startTableVersion, endTableVersion);
793+
if (descriptor.isPresent()) {
794+
return descriptor.get();
795+
}
796+
}
797+
798+
// Fall back to scanning the transaction log if checksum file reading is disabled, if the latest checksum file is
799+
// missing, or if the checksum file does not capture the relevant information
800+
return loadDescriptorFromTransactionLog(session, table, fileSystem, endTableVersion);
801+
}
802+
803+
private Optional<DeltaLakeTableDescriptor> loadDescriptorFromChecksum(
804+
SchemaTableName tableName,
805+
TrinoFileSystem fileSystem,
806+
String tableLocation,
807+
Optional<Long> startTableVersion,
808+
Optional<Long> endTableVersion)
809+
{
810+
long latestEligibleCommit;
811+
812+
if (endTableVersion.isPresent()) {
813+
// Optimization: we already validated the existence of endTableVersion in getVersion, so endTableVersion is
814+
// definitionally the latest eligible commit. Attempt to read the latest checksum file directly without an
815+
// additional list operation
816+
latestEligibleCommit = endTableVersion.orElseThrow();
817+
}
818+
else {
819+
Optional<CommitVersionChecksumFileInfo> checksumFileInfo;
820+
try {
821+
checksumFileInfo = findLatestCommitVersionChecksumFileInfo(fileSystem, tableLocation, startTableVersion, endTableVersion);
822+
}
823+
catch (IOException | UncheckedIOException e) {
824+
// If we hit an IO-related error when determining the latest eligible commit, treat this as a hard failure;
825+
// falling back to scanning the Delta log is unlikely to help
826+
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed to determine latest commit version for " + tableName, e);
827+
}
828+
829+
if (checksumFileInfo.isEmpty()) {
830+
// If there are absolutely no commits in the specified range in the Delta log, fail fast to avoid an
831+
// additional useless scan over the log. For consistency, use the same error message as on Delta log scan
832+
// codepath
833+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableName);
834+
}
835+
836+
CommitVersionChecksumFileInfo info = checksumFileInfo.orElseThrow();
837+
if (!info.hasVersionChecksumFile()) {
838+
// If there exists a commit in the specified range, but there is no version checksum file available for that
839+
// commit, fall back to scanning the Delta log. Version checksum files are optional per the Delta spec
840+
return Optional.empty();
841+
}
842+
843+
latestEligibleCommit = info.version();
844+
}
845+
846+
Optional<DeltaLakeVersionChecksum> versionChecksum;
847+
try {
848+
versionChecksum = readVersionChecksumFile(fileSystem, tableLocation, latestEligibleCommit);
849+
}
850+
catch (IOException | UncheckedIOException e) {
851+
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed to read checksum file for version %d of table %s", latestEligibleCommit, tableName), e);
852+
}
853+
854+
if (versionChecksum.isEmpty()) {
855+
// Nonexistent or structurally-invalid version checksum file; fall back to scanning the Delta log
856+
return Optional.empty();
857+
}
858+
859+
DeltaLakeVersionChecksum checksum = versionChecksum.orElseThrow();
860+
861+
MetadataEntry metadataEntry = checksum.getMetadata();
862+
ProtocolEntry protocolEntry = checksum.getProtocol();
863+
if (metadataEntry == null || protocolEntry == null) {
864+
// Version checksum file is missing critical information; fall back to scanning the Delta log
865+
return Optional.empty();
866+
}
867+
868+
return Optional.of(new DeltaLakeTableDescriptor(latestEligibleCommit, metadataEntry, protocolEntry));
869+
}
870+
871+
private DeltaLakeTableDescriptor loadDescriptorFromTransactionLog(
872+
ConnectorSession session,
873+
DeltaMetastoreTable table,
874+
TrinoFileSystem fileSystem,
875+
Optional<Long> endTableVersion)
876+
{
877+
TableSnapshot tableSnapshot = getSnapshot(session, table, endTableVersion);
878+
MetadataAndProtocolEntries logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
879+
880+
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
881+
if (metadataEntry == null) {
882+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable());
883+
}
884+
885+
ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
886+
if (protocolEntry == null) {
887+
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable());
888+
}
889+
890+
return new DeltaLakeTableDescriptor(tableSnapshot.getVersion(), metadataEntry, protocolEntry);
891+
}
892+
774893
@Override
775894
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
776895
{

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public final class DeltaLakeSessionProperties
7575
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
7676
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
7777
private static final String STORE_TABLE_METADATA = "store_table_metadata";
78+
private static final String LOAD_METADATA_FROM_CHECKSUM_FILE = "load_metadata_from_checksum_file";
7879

7980
private final List<PropertyMetadata<?>> sessionProperties;
8081

@@ -226,6 +227,11 @@ public DeltaLakeSessionProperties(
226227
"Require filter on partition column",
227228
deltaLakeConfig.isQueryPartitionFilterRequired(),
228229
false),
230+
booleanProperty(
231+
LOAD_METADATA_FROM_CHECKSUM_FILE,
232+
"Use checksum metadata file for metadata and protocol entry retrieval",
233+
deltaLakeConfig.isLoadMetadataFromChecksumFile(),
234+
false),
229235
booleanProperty(
230236
STORE_TABLE_METADATA,
231237
"Store table metadata in metastore",
@@ -344,6 +350,11 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
344350
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
345351
}
346352

353+
public static boolean isLoadMetadataFromChecksumFile(ConnectorSession session)
354+
{
355+
return session.getProperty(LOAD_METADATA_FROM_CHECKSUM_FILE, Boolean.class);
356+
}
357+
347358
public static boolean isStoreTableMetadataInMetastoreEnabled(ConnectorSession session)
348359
{
349360
return session.getProperty(STORE_TABLE_METADATA, Boolean.class);

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/metastore/DeltaLakeTableMetadataScheduler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,14 @@ public void stop()
213213
}
214214

215215
public static boolean isSameTransactionVersion(Table table, TableSnapshot snapshot)
216+
{
217+
return isSameTransactionVersion(table, snapshot.getVersion());
218+
}
219+
220+
public static boolean isSameTransactionVersion(Table table, long snapshotVersion)
216221
{
217222
return getLastTransactionVersion(table)
218-
.map(version -> version == snapshot.getVersion())
223+
.map(version -> version == snapshotVersion)
219224
.orElse(false);
220225
}
221226

0 commit comments

Comments
 (0)