Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,28 @@ values. Typical usage does not require you to configure them.
is parallelized.
- `8`
* - `delta.checkpoint-processing.parallelism`
- Number of threads used for retrieving checkpoint files of each table. Currently, only
retrievals of V2 Checkpoint's sidecar files are parallelized.
- Number of threads used for processing Parquet checkpoint files for each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the exhaustive description of the PR

would it be possible to publish as well a set of benchmarks to give an impression to the reviewers on how effective this contribution is?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I recognize this patch can't really be fully evaluated with a deeper set of benchmarks! I'm in the process of driving some performance tests internally, and will share results when I have them

Please feel free to hold off on a deep review until those benchmarks are available

table
- `4`
* - `delta.checkpoint-processing.v1-parallel-processing.enabled`
- Enable parallel processing for V1 checkpoints
- `false`
* - `delta.checkpoint-processing.v2-parallel-processing.enabled`
- Enable fully parallel processing for V2 checkpoint sidecar files (by
default, sidecar files are opened in parallel but read serially)
- `false`
* - `delta.checkpoint-processing.intra-file-parallel-processing.enabled`
- Enable parallel processing of individual checkpoint Parquet files by
subdividing into splits
- `false`
* - `delta.checkpoint-processing.intra-file-parallel-processing.split-size`
- Goal split size for parallel processing of individual checkpoint Parquet files
- `128MB`
* - `delta.load-metadata-from-checksum-file`
- Use the Delta checksum metadata file (if available) to retrieve table
metadata and protocol entries instead of scanning the transaction log. The
equivalent catalog session property is `load_metadata_from_checksum_file`.
- `true`
:::

### Catalog session properties
Expand Down Expand Up @@ -233,6 +252,10 @@ The following table describes {ref}`catalog session properties
- Read only projected fields from row columns while performing `SELECT`
queries.
- `true`
* - `load_metadata_from_checksum_file`
- Use the Delta checksum metadata file (if available) to retrieve table
metadata and protocol entries instead of scanning the transaction log.
- `true`
:::

(delta-lake-fte-support)=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MaxDuration;
import io.airlift.units.MinDataSize;
import io.airlift.units.MinDuration;
import io.airlift.units.ThreadCount;
import io.trino.plugin.hive.HiveCompressionOption;
Expand Down Expand Up @@ -95,6 +96,11 @@ public class DeltaLakeConfig
private boolean deltaLogFileSystemCacheDisabled;
private int metadataParallelism = 8;
private int checkpointProcessingParallelism = 4;
private boolean checkpointV1ParallelProcessingEnabled;
private boolean checkpointV2ParallelProcessingEnabled;
private boolean checkpointIntraFileParallelProcessingEnabled;
private DataSize checkpointIntraFileParallelProcessingSplitSize = DataSize.of(128, MEGABYTE);
private boolean loadMetadataFromChecksumFile = true;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -580,11 +586,78 @@ public int getCheckpointProcessingParallelism()
return checkpointProcessingParallelism;
}

@ConfigDescription("Limits per table scan checkpoint files processing parallelism")
@ConfigDescription("Limits per table scan checkpoint file processing parallelism for checkpoint Parquet files")
@Config("delta.checkpoint-processing.parallelism")
public DeltaLakeConfig setCheckpointProcessingParallelism(int checkpointProcessingParallelism)
{
this.checkpointProcessingParallelism = checkpointProcessingParallelism;
return this;
}

public boolean isCheckpointV1ParallelProcessingEnabled()
{
return checkpointV1ParallelProcessingEnabled;
}

@Config("delta.checkpoint-processing.v1-parallel-processing.enabled")
@ConfigDescription("Enable parallel processing for v1 checkpoints")
public DeltaLakeConfig setCheckpointV1ParallelProcessingEnabled(boolean checkpointV1ParallelProcessingEnabled)
{
this.checkpointV1ParallelProcessingEnabled = checkpointV1ParallelProcessingEnabled;
return this;
}

public boolean isCheckpointV2ParallelProcessingEnabled()
{
return checkpointV2ParallelProcessingEnabled;
}

@Config("delta.checkpoint-processing.v2-parallel-processing.enabled")
@ConfigDescription("Enable fully parallel processing for v2 checkpoint sidecar files")
public DeltaLakeConfig setCheckpointV2ParallelProcessingEnabled(boolean checkpointV2ParallelProcessingEnabled)
{
this.checkpointV2ParallelProcessingEnabled = checkpointV2ParallelProcessingEnabled;
return this;
}

public boolean isCheckpointIntraFileParallelProcessingEnabled()
{
return checkpointIntraFileParallelProcessingEnabled;
}

@Config("delta.checkpoint-processing.intra-file-parallel-processing.enabled")
@ConfigDescription("Enable parallel processing of individual checkpoint Parquet files by subdividing them into splits")
public DeltaLakeConfig setCheckpointIntraFileParallelProcessingEnabled(boolean checkpointIntraFileParallelProcessingEnabled)
{
this.checkpointIntraFileParallelProcessingEnabled = checkpointIntraFileParallelProcessingEnabled;
return this;
}

@NotNull
@MinDataSize("1B")
public DataSize getCheckpointIntraFileParallelProcessingSplitSize()
{
return checkpointIntraFileParallelProcessingSplitSize;
}

@Config("delta.checkpoint-processing.intra-file-parallel-processing.split-size")
@ConfigDescription("Target split size for parallel processing of individual checkpoint Parquet files")
public DeltaLakeConfig setCheckpointIntraFileParallelProcessingSplitSize(DataSize checkpointIntraFileParallelProcessingSplitSize)
{
this.checkpointIntraFileParallelProcessingSplitSize = checkpointIntraFileParallelProcessingSplitSize;
return this;
}

public boolean isLoadMetadataFromChecksumFile()
{
return loadMetadataFromChecksumFile;
}

@Config("delta.load-metadata-from-checksum-file")
@ConfigDescription("Use checksum metadata file (if available) for metadata and protocol entry retrieval, rather than scanning the log")
public DeltaLakeConfig setLoadMetadataFromChecksumFile(boolean loadMetadataFromChecksumFile)
{
this.loadMetadataFromChecksumFile = loadMetadataFromChecksumFile;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.ColumnMappingMode;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.UnsupportedTypeException;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeVersionChecksum;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser.CommitVersionChecksumFileInfo;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
import io.trino.plugin.deltalake.transactionlog.checkpoint.MetadataAndProtocolEntries;
Expand Down Expand Up @@ -162,6 +164,7 @@
import io.trino.spi.type.VarcharType;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand Down Expand Up @@ -241,6 +244,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isCollectExtendedStatisticsColumnStatisticsOnWrite;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLoadMetadataFromChecksumFile;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isStoreTableMetadataInMetastoreEnabled;
Expand Down Expand Up @@ -298,8 +302,10 @@
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.DELTA_CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.transactionlog.MetadataEntry.configurationForNewTable;
import static io.trino.plugin.deltalake.transactionlog.TemporalTimeTravelUtil.findLatestVersionUsingTemporal;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.findLatestCommitVersionChecksumFileInfo;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.getMandatoryCurrentVersion;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readVersionChecksumFile;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
Expand Down Expand Up @@ -480,6 +486,15 @@ private record QueriedTable(SchemaTableName schemaTableName, long version)
}
}

private record DeltaLakeTableDescriptor(long version, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
DeltaLakeTableDescriptor
{
requireNonNull(metadataEntry, "metadataEntry is null");
requireNonNull(protocolEntry, "protocolEntry is null");
}
}

public DeltaLakeMetadata(
DeltaLakeMetastore metastore,
TransactionLogAccess transactionLogAccess,
Expand Down Expand Up @@ -717,27 +732,22 @@ public LocatedTableHandle getTableHandle(

String tableLocation = table.location();
TrinoFileSystem fileSystem = fileSystemFactory.create(session, table);
TableSnapshot tableSnapshot = getSnapshot(session, table, endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor)));

MetadataAndProtocolEntries logEntries;
DeltaLakeTableDescriptor descriptor;
try {
logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);
descriptor = loadDescriptor(session, tableName, table, fileSystem, tableLocation, startVersion, endVersion);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, e);
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), table.managed(), tableLocation, e);
}
throw e;
}
MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
if (metadataEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable()));
}

ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
if (protocolEntry == null) {
return new CorruptedDeltaLakeTableHandle(tableName, table.catalogOwned(), managed, tableLocation, new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable()));
}
MetadataEntry metadataEntry = descriptor.metadataEntry();
ProtocolEntry protocolEntry = descriptor.protocolEntry();
long snapshotVersion = descriptor.version();

if (protocolEntry.minReaderVersion() > MAX_READER_VERSION) {
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.minReaderVersion());
return null;
Expand All @@ -750,8 +760,8 @@ public LocatedTableHandle getTableHandle(
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry, protocolEntry));
if (metadataScheduler.canStoreTableMetadata(session, metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())) &&
endVersion.isEmpty() &&
!isSameTransactionVersion(metastoreTable.get(), tableSnapshot)) {
tableUpdateInfos.put(tableName, new TableUpdateInfo(session, tableSnapshot.getVersion(), metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())));
!isSameTransactionVersion(metastoreTable.get(), snapshotVersion)) {
tableUpdateInfos.put(tableName, new TableUpdateInfo(session, snapshotVersion, metadataEntry.getSchemaString(), Optional.ofNullable(metadataEntry.getDescription())));
}
return new DeltaLakeTableHandle(
tableName.getSchemaName(),
Expand All @@ -767,10 +777,109 @@ public LocatedTableHandle getTableHandle(
Optional.empty(),
Optional.empty(),
Optional.empty(),
tableSnapshot.getVersion(),
snapshotVersion,
endVersion.isPresent());
}

private DeltaLakeTableDescriptor loadDescriptor(ConnectorSession session, SchemaTableName tableName, DeltaMetastoreTable table, TrinoFileSystem fileSystem, String tableLocation, Optional<ConnectorTableVersion> startVersion, Optional<ConnectorTableVersion> endVersion)
{
Optional<Long> endTableVersion = endVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor));

if (isLoadMetadataFromChecksumFile(session)) {
Optional<Long> startTableVersion = startVersion.map(version -> getVersion(session, fileSystem, tableLocation, version, metadataFetchingExecutor));

Optional<DeltaLakeTableDescriptor> descriptor = loadDescriptorFromChecksum(tableName, fileSystem, tableLocation, startTableVersion, endTableVersion);
if (descriptor.isPresent()) {
return descriptor.get();
}
}

// Fall back to scanning the transaction log if checksum file reading is disabled or if the checksum file is missing
// or invalid
return loadDescriptorFromTransactionLog(session, table, fileSystem, endTableVersion);
}

private Optional<DeltaLakeTableDescriptor> loadDescriptorFromChecksum(
SchemaTableName tableName,
TrinoFileSystem fileSystem,
String tableLocation,
Optional<Long> startTableVersion,
Optional<Long> endTableVersion)
{
long latestEligibleCommit;

if (endTableVersion.isPresent()) {
// Optimization: we already validated the existence of endTableVersion in getVersion, so endTableVersion is
// definitionally the latest eligible commit. Attempt to read the latest checksum file directly without an
// additional list operation
latestEligibleCommit = endTableVersion.orElseThrow();
}
else {
Optional<CommitVersionChecksumFileInfo> checksumFileInfo;
try {
checksumFileInfo = findLatestCommitVersionChecksumFileInfo(fileSystem, tableLocation, startTableVersion, endTableVersion);
}
catch (IOException | UncheckedIOException e) {
return Optional.empty();
}

if (checksumFileInfo.isEmpty()) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableName);
}

CommitVersionChecksumFileInfo info = checksumFileInfo.orElseThrow();
if (!info.hasVersionChecksumFile()) {
return Optional.empty();
}

latestEligibleCommit = info.version();
}

Optional<DeltaLakeVersionChecksum> versionChecksum;
try {
versionChecksum = readVersionChecksumFile(fileSystem, tableLocation, latestEligibleCommit);
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed to read checksum file for version %d of table %s", latestEligibleCommit, tableName), e);
}

if (versionChecksum.isEmpty()) {
return Optional.empty();
}

DeltaLakeVersionChecksum checksum = versionChecksum.orElseThrow();

MetadataEntry metadataEntry = checksum.getMetadata();
ProtocolEntry protocolEntry = checksum.getProtocol();
if (metadataEntry == null || protocolEntry == null) {
return Optional.empty();
}

return Optional.of(new DeltaLakeTableDescriptor(latestEligibleCommit, metadataEntry, protocolEntry));
}

private DeltaLakeTableDescriptor loadDescriptorFromTransactionLog(
ConnectorSession session,
DeltaMetastoreTable table,
TrinoFileSystem fileSystem,
Optional<Long> endTableVersion)
{
TableSnapshot tableSnapshot = getSnapshot(session, table, endTableVersion);
MetadataAndProtocolEntries logEntries = transactionLogAccess.getMetadataAndProtocolEntry(session, fileSystem, tableSnapshot);

MetadataEntry metadataEntry = logEntries.metadata().orElse(null);
if (metadataEntry == null) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + tableSnapshot.getTable());
}

ProtocolEntry protocolEntry = logEntries.protocol().orElse(null);
if (protocolEntry == null) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + tableSnapshot.getTable());
}

return new DeltaLakeTableDescriptor(tableSnapshot.getVersion(), metadataEntry, protocolEntry);
}

@Override
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class DeltaLakeSessionProperties
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String STORE_TABLE_METADATA = "store_table_metadata";
private static final String LOAD_METADATA_FROM_CHECKSUM_FILE = "load_metadata_from_checksum_file";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -226,6 +227,11 @@ public DeltaLakeSessionProperties(
"Require filter on partition column",
deltaLakeConfig.isQueryPartitionFilterRequired(),
false),
booleanProperty(
LOAD_METADATA_FROM_CHECKSUM_FILE,
"Use checksum metadata file for metadata and protocol entry retrieval",
deltaLakeConfig.isLoadMetadataFromChecksumFile(),
false),
booleanProperty(
STORE_TABLE_METADATA,
"Store table metadata in metastore",
Expand Down Expand Up @@ -344,6 +350,11 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static boolean isLoadMetadataFromChecksumFile(ConnectorSession session)
{
return session.getProperty(LOAD_METADATA_FROM_CHECKSUM_FILE, Boolean.class);
}

public static boolean isStoreTableMetadataInMetastoreEnabled(ConnectorSession session)
{
return session.getProperty(STORE_TABLE_METADATA, Boolean.class);
Expand Down
Loading