6969import io .trino .plugin .deltalake .transactionlog .DeltaLakeSchemaSupport .ColumnMappingMode ;
7070import io .trino .plugin .deltalake .transactionlog .DeltaLakeSchemaSupport .UnsupportedTypeException ;
7171import io .trino .plugin .deltalake .transactionlog .DeltaLakeTransactionLogEntry ;
72+ import io .trino .plugin .deltalake .transactionlog .DeltaLakeVersionChecksum ;
7273import io .trino .plugin .deltalake .transactionlog .MetadataEntry ;
7374import io .trino .plugin .deltalake .transactionlog .ProtocolEntry ;
7475import io .trino .plugin .deltalake .transactionlog .RemoveFileEntry ;
7576import io .trino .plugin .deltalake .transactionlog .TableSnapshot ;
7677import io .trino .plugin .deltalake .transactionlog .Transaction ;
7778import io .trino .plugin .deltalake .transactionlog .TransactionLogAccess ;
7879import io .trino .plugin .deltalake .transactionlog .TransactionLogEntries ;
80+ import io .trino .plugin .deltalake .transactionlog .TransactionLogParser .CommitVersionChecksumFileInfo ;
7981import io .trino .plugin .deltalake .transactionlog .checkpoint .CheckpointWriterManager ;
8082import io .trino .plugin .deltalake .transactionlog .checkpoint .LastCheckpoint ;
8183import io .trino .plugin .deltalake .transactionlog .checkpoint .MetadataAndProtocolEntries ;
162164import io .trino .spi .type .VarcharType ;
163165
164166import java .io .IOException ;
167+ import java .io .UncheckedIOException ;
165168import java .net .URI ;
166169import java .net .URISyntaxException ;
167170import java .time .Duration ;
241244import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .getHiveCatalogName ;
242245import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .isCollectExtendedStatisticsColumnStatisticsOnWrite ;
243246import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .isExtendedStatisticsEnabled ;
247+ import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .isLoadMetadataFromChecksumFile ;
244248import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .isProjectionPushdownEnabled ;
245249import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .isQueryPartitionFilterRequired ;
246250import static io .trino .plugin .deltalake .DeltaLakeSessionProperties .isStoreTableMetadataInMetastoreEnabled ;
298302import static io .trino .plugin .deltalake .transactionlog .MetadataEntry .DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY ;
299303import static io .trino .plugin .deltalake .transactionlog .MetadataEntry .configurationForNewTable ;
300304import static io .trino .plugin .deltalake .transactionlog .TemporalTimeTravelUtil .findLatestVersionUsingTemporal ;
305+ import static io .trino .plugin .deltalake .transactionlog .TransactionLogParser .findLatestCommitVersionChecksumFileInfo ;
301306import static io .trino .plugin .deltalake .transactionlog .TransactionLogParser .getMandatoryCurrentVersion ;
302307import static io .trino .plugin .deltalake .transactionlog .TransactionLogParser .readLastCheckpoint ;
308+ import static io .trino .plugin .deltalake .transactionlog .TransactionLogParser .readVersionChecksumFile ;
303309import static io .trino .plugin .deltalake .transactionlog .TransactionLogUtil .getTransactionLogDir ;
304310import static io .trino .plugin .deltalake .transactionlog .TransactionLogUtil .getTransactionLogJsonEntryPath ;
305311import static io .trino .plugin .deltalake .transactionlog .checkpoint .TransactionLogTail .getEntriesFromJson ;
@@ -480,6 +486,15 @@ private record QueriedTable(SchemaTableName schemaTableName, long version)
480486 }
481487 }
482488
489+ private record DeltaLakeTableDescriptor (long version , MetadataEntry metadataEntry , ProtocolEntry protocolEntry )
490+ {
491+ DeltaLakeTableDescriptor
492+ {
493+ requireNonNull (metadataEntry , "metadataEntry is null" );
494+ requireNonNull (protocolEntry , "protocolEntry is null" );
495+ }
496+ }
497+
483498 public DeltaLakeMetadata (
484499 DeltaLakeMetastore metastore ,
485500 TransactionLogAccess transactionLogAccess ,
@@ -717,27 +732,22 @@ public LocatedTableHandle getTableHandle(
717732
718733 String tableLocation = table .location ();
719734 TrinoFileSystem fileSystem = fileSystemFactory .create (session , table );
720- TableSnapshot tableSnapshot = getSnapshot (session , table , endVersion .map (version -> getVersion (session , fileSystem , tableLocation , version , metadataFetchingExecutor )));
721735
722- MetadataAndProtocolEntries logEntries ;
736+ DeltaLakeTableDescriptor descriptor ;
723737 try {
724- logEntries = transactionLogAccess . getMetadataAndProtocolEntry (session , fileSystem , tableSnapshot );
738+ descriptor = loadDescriptor (session , tableName , table , fileSystem , tableLocation , startVersion , endVersion );
725739 }
726740 catch (TrinoException e ) {
727741 if (e .getErrorCode ().equals (DELTA_LAKE_INVALID_SCHEMA .toErrorCode ())) {
728- return new CorruptedDeltaLakeTableHandle (tableName , table .catalogOwned (), managed , tableLocation , e );
742+ return new CorruptedDeltaLakeTableHandle (tableName , table .catalogOwned (), table . managed () , tableLocation , e );
729743 }
730744 throw e ;
731745 }
732- MetadataEntry metadataEntry = logEntries .metadata ().orElse (null );
733- if (metadataEntry == null ) {
734- return new CorruptedDeltaLakeTableHandle (tableName , table .catalogOwned (), managed , tableLocation , new TrinoException (DELTA_LAKE_INVALID_SCHEMA , "Metadata not found in transaction log for " + tableSnapshot .getTable ()));
735- }
736746
737- ProtocolEntry protocolEntry = logEntries . protocol (). orElse ( null );
738- if ( protocolEntry == null ) {
739- return new CorruptedDeltaLakeTableHandle ( tableName , table . catalogOwned (), managed , tableLocation , new TrinoException ( DELTA_LAKE_INVALID_SCHEMA , "Protocol not found in transaction log for " + tableSnapshot . getTable ()) );
740- }
747+ MetadataEntry metadataEntry = descriptor . metadataEntry ( );
748+ ProtocolEntry protocolEntry = descriptor . protocolEntry ();
749+ long snapshotVersion = descriptor . version ( );
750+
741751 if (protocolEntry .minReaderVersion () > MAX_READER_VERSION ) {
742752 LOG .debug ("Skip %s because the reader version is unsupported: %d" , tableName , protocolEntry .minReaderVersion ());
743753 return null ;
@@ -750,8 +760,8 @@ public LocatedTableHandle getTableHandle(
750760 verifySupportedColumnMapping (getColumnMappingMode (metadataEntry , protocolEntry ));
751761 if (metadataScheduler .canStoreTableMetadata (session , metadataEntry .getSchemaString (), Optional .ofNullable (metadataEntry .getDescription ())) &&
752762 endVersion .isEmpty () &&
753- !isSameTransactionVersion (metastoreTable .get (), tableSnapshot )) {
754- tableUpdateInfos .put (tableName , new TableUpdateInfo (session , tableSnapshot . getVersion () , metadataEntry .getSchemaString (), Optional .ofNullable (metadataEntry .getDescription ())));
763+ !isSameTransactionVersion (metastoreTable .get (), snapshotVersion )) {
764+ tableUpdateInfos .put (tableName , new TableUpdateInfo (session , snapshotVersion , metadataEntry .getSchemaString (), Optional .ofNullable (metadataEntry .getDescription ())));
755765 }
756766 return new DeltaLakeTableHandle (
757767 tableName .getSchemaName (),
@@ -767,10 +777,109 @@ public LocatedTableHandle getTableHandle(
767777 Optional .empty (),
768778 Optional .empty (),
769779 Optional .empty (),
770- tableSnapshot . getVersion () ,
780+ snapshotVersion ,
771781 endVersion .isPresent ());
772782 }
773783
784+ private DeltaLakeTableDescriptor loadDescriptor (ConnectorSession session , SchemaTableName tableName , DeltaMetastoreTable table , TrinoFileSystem fileSystem , String tableLocation , Optional <ConnectorTableVersion > startVersion , Optional <ConnectorTableVersion > endVersion )
785+ {
786+ Optional <Long > endTableVersion = endVersion .map (version -> getVersion (session , fileSystem , tableLocation , version , metadataFetchingExecutor ));
787+
788+ if (isLoadMetadataFromChecksumFile (session )) {
789+ Optional <Long > startTableVersion = startVersion .map (version -> getVersion (session , fileSystem , tableLocation , version , metadataFetchingExecutor ));
790+
791+ Optional <DeltaLakeTableDescriptor > descriptor = loadDescriptorFromChecksum (tableName , fileSystem , tableLocation , startTableVersion , endTableVersion );
792+ if (descriptor .isPresent ()) {
793+ return descriptor .get ();
794+ }
795+ }
796+
797+ // Fall back to scanning the transaction log if checksum file reading is disabled or if the checksum file is missing
798+ // or invalid
799+ return loadDescriptorFromTransactionLog (session , table , fileSystem , endTableVersion );
800+ }
801+
802+ private Optional <DeltaLakeTableDescriptor > loadDescriptorFromChecksum (
803+ SchemaTableName tableName ,
804+ TrinoFileSystem fileSystem ,
805+ String tableLocation ,
806+ Optional <Long > startTableVersion ,
807+ Optional <Long > endTableVersion )
808+ {
809+ long latestEligibleCommit ;
810+
811+ if (endTableVersion .isPresent ()) {
812+ // Optimization: we already validated the existence of endTableVersion in getVersion, so endTableVersion is
813+ // definitionally the latest eligible commit. Attempt to read the latest checksum file directly without an
814+ // additional list operation
815+ latestEligibleCommit = endTableVersion .orElseThrow ();
816+ }
817+ else {
818+ Optional <CommitVersionChecksumFileInfo > checksumFileInfo ;
819+ try {
820+ checksumFileInfo = findLatestCommitVersionChecksumFileInfo (fileSystem , tableLocation , startTableVersion , endTableVersion );
821+ }
822+ catch (IOException | UncheckedIOException e ) {
823+ return Optional .empty ();
824+ }
825+
826+ if (checksumFileInfo .isEmpty ()) {
827+ throw new TrinoException (DELTA_LAKE_INVALID_SCHEMA , "Metadata not found in transaction log for " + tableName );
828+ }
829+
830+ CommitVersionChecksumFileInfo info = checksumFileInfo .orElseThrow ();
831+ if (!info .hasVersionChecksumFile ()) {
832+ return Optional .empty ();
833+ }
834+
835+ latestEligibleCommit = info .version ();
836+ }
837+
838+ Optional <DeltaLakeVersionChecksum > versionChecksum ;
839+ try {
840+ versionChecksum = readVersionChecksumFile (fileSystem , tableLocation , latestEligibleCommit );
841+ }
842+ catch (IOException | UncheckedIOException e ) {
843+ throw new TrinoException (DELTA_LAKE_FILESYSTEM_ERROR , format ("Failed to read checksum file for version %d of table %s" , latestEligibleCommit , tableName ), e );
844+ }
845+
846+ if (versionChecksum .isEmpty ()) {
847+ return Optional .empty ();
848+ }
849+
850+ DeltaLakeVersionChecksum checksum = versionChecksum .orElseThrow ();
851+
852+ MetadataEntry metadataEntry = checksum .getMetadata ();
853+ ProtocolEntry protocolEntry = checksum .getProtocol ();
854+ if (metadataEntry == null || protocolEntry == null ) {
855+ return Optional .empty ();
856+ }
857+
858+ return Optional .of (new DeltaLakeTableDescriptor (latestEligibleCommit , metadataEntry , protocolEntry ));
859+ }
860+
861+ private DeltaLakeTableDescriptor loadDescriptorFromTransactionLog (
862+ ConnectorSession session ,
863+ DeltaMetastoreTable table ,
864+ TrinoFileSystem fileSystem ,
865+ Optional <Long > endTableVersion )
866+ {
867+ TableSnapshot tableSnapshot = getSnapshot (session , table , endTableVersion );
868+ MetadataAndProtocolEntries logEntries = transactionLogAccess .getMetadataAndProtocolEntry (session , fileSystem , tableSnapshot );
869+
870+ MetadataEntry metadataEntry = logEntries .metadata ().orElse (null );
871+ if (metadataEntry == null ) {
872+ throw new TrinoException (DELTA_LAKE_INVALID_SCHEMA , "Metadata not found in transaction log for " + tableSnapshot .getTable ());
873+ }
874+
875+ ProtocolEntry protocolEntry = logEntries .protocol ().orElse (null );
876+ if (protocolEntry == null ) {
877+ throw new TrinoException (DELTA_LAKE_INVALID_SCHEMA , "Protocol not found in transaction log for " + tableSnapshot .getTable ());
878+ }
879+
880+ return new DeltaLakeTableDescriptor (tableSnapshot .getVersion (), metadataEntry , protocolEntry );
881+ }
882+
774883 @ Override
775884 public ConnectorTableProperties getTableProperties (ConnectorSession session , ConnectorTableHandle tableHandle )
776885 {
0 commit comments