Skip to content

Commit 080acae

Browse files
Parallelize Parquet checkpoint reads in the Delta Lake connector
Delta uses checkpoints to represent point-in-time snapshots of the table, e.g. the full set of active files. Most queries and write operations must load the checkpoint before proceeding. For large tables checkpoints can be extremely large, so checkpoint loading is important for overall query performance This PR introduces full support for parallel reads of checkpoint files in Delta tables using a new `ParallelUnorderedIterator` that pulls from several `CheckpointEntryIterator` streams in parallel. New configuration properties: - `delta.checkpoint-processing.v{1,2}-parallel-processing.enabled` - `delta.checkpoint-processing.intra-file-parallel-processing.enabled` - `delta.checkpoint-processing.intra-file-parallel-processing.split-size` controlling parallelization for v1/v2 checkpoints, intra-file (Parquet split) parallelization, and target split size. We reuse the existing `delta.checkpoint-processing.parallelism` to tune parallelism Co-authored-by: Eric Hwang <eh@openai.com>
1 parent 12a5cd1 commit 080acae

17 files changed

+1950
-122
lines changed

docs/src/main/sphinx/connector/delta-lake.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,23 @@ values. Typical usage does not require you to configure them.
197197
is parallelized.
198198
- `8`
199199
* - `delta.checkpoint-processing.parallelism`
200-
- Number of threads used for retrieving checkpoint files of each table. Currently, only
201-
retrievals of V2 Checkpoint's sidecar files are parallelized.
200+
- Number of threads used for processing Parquet checkpoint files for each
201+
table
202202
- `4`
203+
* - `delta.checkpoint-processing.v1-parallel-processing.enabled`
204+
- Enable parallel processing for V1 checkpoints
205+
- `false`
206+
* - `delta.checkpoint-processing.v2-parallel-processing.enabled`
207+
- Enable fully parallel processing for V2 checkpoint sidecar files (by
208+
default, sidecar files are opened in parallel but read serially)
209+
- `false`
210+
* - `delta.checkpoint-processing.intra-file-parallel-processing.enabled`
211+
- Enable parallel processing of individual checkpoint Parquet files by
212+
subdividing into splits
213+
- `false`
214+
* - `delta.checkpoint-processing.intra-file-parallel-processing.split-size`
215+
- Goal split size for parallel processing of individual checkpoint Parquet files
216+
- `128MB`
203217
* - `delta.load-metadata-from-checksum-file`
204218
- Use the Delta checksum metadata file (if available) to retrieve table
205219
metadata and protocol entries instead of scanning the transaction log. The

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airlift.units.DataSize;
2323
import io.airlift.units.Duration;
2424
import io.airlift.units.MaxDuration;
25+
import io.airlift.units.MinDataSize;
2526
import io.airlift.units.MinDuration;
2627
import io.airlift.units.ThreadCount;
2728
import io.trino.plugin.hive.HiveCompressionOption;
@@ -95,6 +96,10 @@ public class DeltaLakeConfig
9596
private boolean deltaLogFileSystemCacheDisabled;
9697
private int metadataParallelism = 8;
9798
private int checkpointProcessingParallelism = 4;
99+
private boolean checkpointV1ParallelProcessingEnabled;
100+
private boolean checkpointV2ParallelProcessingEnabled;
101+
private boolean checkpointIntraFileParallelProcessingEnabled;
102+
private DataSize checkpointIntraFileParallelProcessingSplitSize = DataSize.of(128, MEGABYTE);
98103
private boolean loadMetadataFromChecksumFile = true;
99104

100105
public Duration getMetadataCacheTtl()
@@ -581,14 +586,68 @@ public int getCheckpointProcessingParallelism()
581586
return checkpointProcessingParallelism;
582587
}
583588

584-
@ConfigDescription("Limits per table scan checkpoint files processing parallelism")
589+
@ConfigDescription("Limits per table scan checkpoint file processing parallelism for checkpoint Parquet files")
585590
@Config("delta.checkpoint-processing.parallelism")
586591
public DeltaLakeConfig setCheckpointProcessingParallelism(int checkpointProcessingParallelism)
587592
{
588593
this.checkpointProcessingParallelism = checkpointProcessingParallelism;
589594
return this;
590595
}
591596

597+
public boolean isCheckpointV1ParallelProcessingEnabled()
598+
{
599+
return checkpointV1ParallelProcessingEnabled;
600+
}
601+
602+
@Config("delta.checkpoint-processing.v1-parallel-processing.enabled")
603+
@ConfigDescription("Enable parallel processing for v1 checkpoints")
604+
public DeltaLakeConfig setCheckpointV1ParallelProcessingEnabled(boolean checkpointV1ParallelProcessingEnabled)
605+
{
606+
this.checkpointV1ParallelProcessingEnabled = checkpointV1ParallelProcessingEnabled;
607+
return this;
608+
}
609+
610+
public boolean isCheckpointV2ParallelProcessingEnabled()
611+
{
612+
return checkpointV2ParallelProcessingEnabled;
613+
}
614+
615+
@Config("delta.checkpoint-processing.v2-parallel-processing.enabled")
616+
@ConfigDescription("Enable fully parallel processing for v2 checkpoint sidecar files")
617+
public DeltaLakeConfig setCheckpointV2ParallelProcessingEnabled(boolean checkpointV2ParallelProcessingEnabled)
618+
{
619+
this.checkpointV2ParallelProcessingEnabled = checkpointV2ParallelProcessingEnabled;
620+
return this;
621+
}
622+
623+
public boolean isCheckpointIntraFileParallelProcessingEnabled()
624+
{
625+
return checkpointIntraFileParallelProcessingEnabled;
626+
}
627+
628+
@Config("delta.checkpoint-processing.intra-file-parallel-processing.enabled")
629+
@ConfigDescription("Enable parallel processing of individual checkpoint Parquet files by subdividing them into splits")
630+
public DeltaLakeConfig setCheckpointIntraFileParallelProcessingEnabled(boolean checkpointIntraFileParallelProcessingEnabled)
631+
{
632+
this.checkpointIntraFileParallelProcessingEnabled = checkpointIntraFileParallelProcessingEnabled;
633+
return this;
634+
}
635+
636+
@NotNull
637+
@MinDataSize("1B")
638+
public DataSize getCheckpointIntraFileParallelProcessingSplitSize()
639+
{
640+
return checkpointIntraFileParallelProcessingSplitSize;
641+
}
642+
643+
@Config("delta.checkpoint-processing.intra-file-parallel-processing.split-size")
644+
@ConfigDescription("Target split size for parallel processing of individual checkpoint Parquet files")
645+
public DeltaLakeConfig setCheckpointIntraFileParallelProcessingSplitSize(DataSize checkpointIntraFileParallelProcessingSplitSize)
646+
{
647+
this.checkpointIntraFileParallelProcessingSplitSize = checkpointIntraFileParallelProcessingSplitSize;
648+
return this;
649+
}
650+
592651
public boolean isLoadMetadataFromChecksumFile()
593652
{
594653
return loadMetadataFromChecksumFile;

0 commit comments

Comments
 (0)