Parallelize Parquet checkpoint reads in the Delta Lake connector#28470
Conversation
Compliant Delta writers may emit optional checksum files alongside commits containing metadata and protocol information. Instead of loading the latest checkpoint and replaying intervening commits (which can be expensive, especially for large v1 checkpoints), Trino can read the latest commit’s checksum file to obtain this information with a single listing and small JSON read. Ref. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file If the checksum file is missing or does not contain both metadata and protocol, we fall back to the existing Delta log scanning approach. Behavior is gated by session property load_metadata_from_checksum_file (defaulting to config delta.load_metadata_from_checksum_file, which defaults to true). Internal testing reduced analysis time for large v1-checkpoint tables from ~10s to <500ms. Co-authored-by: Eric Hwang <eh@openai.com> Co-authored-by: Fred Liu <fredliu@openai.com>
a4fd73a to
b9af4cd
Compare
Delta uses checkpoints to represent point-in-time snapshots of the
table, e.g. the full set of active files. Most queries and write
operations must load the checkpoint before proceeding. For large tables
checkpoints can be extremely large, so checkpoint loading is important
for overall query performance
This PR introduces full support for parallel reads of checkpoint files
in Delta tables using a new `ParallelUnorderedIterator` that pulls from
several `CheckpointEntryIterator` streams in parallel. New
configuration properties:
- `delta.checkpoint-processing.v{1,2}-parallel-processing.enabled`
- `delta.checkpoint-processing.intra-file-parallel-processing.enabled`
- `delta.checkpoint-processing.intra-file-parallel-processing.split-size`
controlling parallelization for v1/v2 checkpoints, intra-file (Parquet
split) parallelization, and target split size. We reuse the existing
`delta.checkpoint-processing.parallelism` to tune parallelism
Co-authored-by: Eric Hwang <eh@openai.com>
b9af4cd to
080acae
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces parallel processing support for Delta Lake checkpoint files to improve query performance on large tables. The implementation includes new configuration properties to enable parallel processing of v1 and v2 checkpoints, as well as intra-file parallelism for individual Parquet checkpoint files. Additionally, the PR includes support for reading metadata from Delta checksum files (from PR #28381).
Changes:
- Introduced
ParallelUnorderedIteratorto parallelize checkpoint file reading across multiple streams - Added configuration properties to control checkpoint processing parallelism (v1, v2, and intra-file)
- Implemented support for reading metadata and protocol information from Delta checksum files
- Refactored checkpoint processing logic in
TableSnapshotto support parallel and split-based reading
Reviewed changes
Copilot reviewed 59 out of 71 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
ParallelUnorderedIterator.java |
New iterator class for parallel unordered stream processing |
DeltaLakeVersionChecksum.java |
Data class for Delta checksum file structure |
TransactionLogParser.java |
Added methods to find and read checksum files |
TableSnapshot.java |
Refactored checkpoint entry reading to support parallel processing and file splits |
CheckpointEntryIterator.java |
Added split support with start offset and length parameters |
DeltaLakeConfig.java |
Added 5 new configuration properties for checkpoint processing and checksum file reading |
DeltaLakeMetadata.java |
Implemented checksum file-based metadata loading with fallback to transaction log |
| Test files | Extensive test coverage including new test fixtures for checksum files |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Map<Boolean, List<DeltaLakeTransactionLogEntry>> partitionedV2CheckpointEntries = | ||
| v2CheckpointEntries.collect(Collectors.partitioningBy(v2checkpointEntry -> v2checkpointEntry.getSidecar() != null)); | ||
|
|
||
| List<DeltaLakeTransactionLogEntry> sidecarEntries = partitionedV2CheckpointEntries.get(true); | ||
| List<DeltaLakeTransactionLogEntry> nonSidecarEntries = partitionedV2CheckpointEntries.get(false); | ||
|
|
||
| List<TrinoInputFile> sidecarFiles = sidecarEntries.stream() | ||
| .map(entry -> { | ||
| SidecarEntry sidecarEntry = entry.getSidecar(); | ||
| Location sidecarPath = sidecarDirectoryPath.appendPath(sidecarEntry.path()); | ||
| return fileSystem.newInputFile(sidecarPath); | ||
| }) | ||
| .collect(toImmutableList()); | ||
|
|
||
| // Note: we could use sidecarEntry.sizeInBytes() to determine the sidecar file length, rather than checking the | ||
| // length directly from the file in computeSplits. This has the advantage of eliminating the need for I/O to check | ||
| // the length while planning. However, at this time, Trino will always fetch the length anyway in TrinoParquetDataSource, | ||
| // and the length is cached once fetched -- so, using sizeInBytes wouldn't save us any calls overall. Moreover, | ||
| // fetching the length at planning time allows us to write more precise and deterministic file system access tests | ||
| Function<TrinoInputFile, Supplier<Stream<CheckpointFileSplit>>> sidecarFileSplitSupplierBuilder = | ||
| checkpointIntraFileParallelProcessingEnabled | ||
| ? sidecarFile -> () -> computeSplits(sidecarFile, checkpointIntraFileParallelProcessingSplitSize) | ||
| : sidecarFile -> () -> Stream.of(computeSingletonSplit(sidecarFile)); | ||
|
|
||
| List<CheckpointFileSplit> sidecarFileSplits = ParallelUnorderedIterator.stream( | ||
| sidecarFiles.stream().map(sidecarFileSplitSupplierBuilder).collect(toImmutableList()), | ||
| checkpointProcessingExecutor).collect(toImmutableList()); | ||
|
|
||
| Stream<DeltaLakeTransactionLogEntry> sidecarFileEntries = ParallelUnorderedIterator.stream( | ||
| sidecarFileSplits.stream().map(sidecarFileSplitStreamBuilder).collect(toImmutableList()), | ||
| checkpointProcessingExecutor, | ||
| CHECKPOINT_FILE_PROCESSING_QUEUE_SIZE); | ||
|
|
||
| return Stream.concat(nonSidecarEntries.stream(), sidecarFileEntries); |
There was a problem hiding this comment.
The v2CheckpointEntries stream is consumed in line 558 (via collect) to partition the entries, which closes this stream. However, this stream is returned by getV2CheckpointEntries() which may contain resources (CheckpointEntryIterator) that need explicit closing. The stream gets consumed and closed during partitioning, but any resources attached to it may not be properly released.
Consider refactoring to ensure that the v2CheckpointEntries stream and its underlying resources are properly closed, possibly by using try-with-resources or ensuring onClose handlers are properly registered and invoked.
| Map<Boolean, List<DeltaLakeTransactionLogEntry>> partitionedV2CheckpointEntries = | |
| v2CheckpointEntries.collect(Collectors.partitioningBy(v2checkpointEntry -> v2checkpointEntry.getSidecar() != null)); | |
| List<DeltaLakeTransactionLogEntry> sidecarEntries = partitionedV2CheckpointEntries.get(true); | |
| List<DeltaLakeTransactionLogEntry> nonSidecarEntries = partitionedV2CheckpointEntries.get(false); | |
| List<TrinoInputFile> sidecarFiles = sidecarEntries.stream() | |
| .map(entry -> { | |
| SidecarEntry sidecarEntry = entry.getSidecar(); | |
| Location sidecarPath = sidecarDirectoryPath.appendPath(sidecarEntry.path()); | |
| return fileSystem.newInputFile(sidecarPath); | |
| }) | |
| .collect(toImmutableList()); | |
| // Note: we could use sidecarEntry.sizeInBytes() to determine the sidecar file length, rather than checking the | |
| // length directly from the file in computeSplits. This has the advantage of eliminating the need for I/O to check | |
| // the length while planning. However, at this time, Trino will always fetch the length anyway in TrinoParquetDataSource, | |
| // and the length is cached once fetched -- so, using sizeInBytes wouldn't save us any calls overall. Moreover, | |
| // fetching the length at planning time allows us to write more precise and deterministic file system access tests | |
| Function<TrinoInputFile, Supplier<Stream<CheckpointFileSplit>>> sidecarFileSplitSupplierBuilder = | |
| checkpointIntraFileParallelProcessingEnabled | |
| ? sidecarFile -> () -> computeSplits(sidecarFile, checkpointIntraFileParallelProcessingSplitSize) | |
| : sidecarFile -> () -> Stream.of(computeSingletonSplit(sidecarFile)); | |
| List<CheckpointFileSplit> sidecarFileSplits = ParallelUnorderedIterator.stream( | |
| sidecarFiles.stream().map(sidecarFileSplitSupplierBuilder).collect(toImmutableList()), | |
| checkpointProcessingExecutor).collect(toImmutableList()); | |
| Stream<DeltaLakeTransactionLogEntry> sidecarFileEntries = ParallelUnorderedIterator.stream( | |
| sidecarFileSplits.stream().map(sidecarFileSplitStreamBuilder).collect(toImmutableList()), | |
| checkpointProcessingExecutor, | |
| CHECKPOINT_FILE_PROCESSING_QUEUE_SIZE); | |
| return Stream.concat(nonSidecarEntries.stream(), sidecarFileEntries); | |
| try (Stream<DeltaLakeTransactionLogEntry> checkpointEntries = v2CheckpointEntries) { | |
| Map<Boolean, List<DeltaLakeTransactionLogEntry>> partitionedV2CheckpointEntries = | |
| checkpointEntries.collect(Collectors.partitioningBy(v2checkpointEntry -> v2checkpointEntry.getSidecar() != null)); | |
| List<DeltaLakeTransactionLogEntry> sidecarEntries = partitionedV2CheckpointEntries.get(true); | |
| List<DeltaLakeTransactionLogEntry> nonSidecarEntries = partitionedV2CheckpointEntries.get(false); | |
| List<TrinoInputFile> sidecarFiles = sidecarEntries.stream() | |
| .map(entry -> { | |
| SidecarEntry sidecarEntry = entry.getSidecar(); | |
| Location sidecarPath = sidecarDirectoryPath.appendPath(sidecarEntry.path()); | |
| return fileSystem.newInputFile(sidecarPath); | |
| }) | |
| .collect(toImmutableList()); | |
| // Note: we could use sidecarEntry.sizeInBytes() to determine the sidecar file length, rather than checking the | |
| // length directly from the file in computeSplits. This has the advantage of eliminating the need for I/O to check | |
| // the length while planning. However, at this time, Trino will always fetch the length anyway in TrinoParquetDataSource, | |
| // and the length is cached once fetched -- so, using sizeInBytes wouldn't save us any calls overall. Moreover, | |
| // fetching the length at planning time allows us to write more precise and deterministic file system access tests | |
| Function<TrinoInputFile, Supplier<Stream<CheckpointFileSplit>>> sidecarFileSplitSupplierBuilder = | |
| checkpointIntraFileParallelProcessingEnabled | |
| ? sidecarFile -> () -> computeSplits(sidecarFile, checkpointIntraFileParallelProcessingSplitSize) | |
| : sidecarFile -> () -> Stream.of(computeSingletonSplit(sidecarFile)); | |
| List<CheckpointFileSplit> sidecarFileSplits = ParallelUnorderedIterator.stream( | |
| sidecarFiles.stream().map(sidecarFileSplitSupplierBuilder).collect(toImmutableList()), | |
| checkpointProcessingExecutor).collect(toImmutableList()); | |
| Stream<DeltaLakeTransactionLogEntry> sidecarFileEntries = ParallelUnorderedIterator.stream( | |
| sidecarFileSplits.stream().map(sidecarFileSplitStreamBuilder).collect(toImmutableList()), | |
| checkpointProcessingExecutor, | |
| CHECKPOINT_FILE_PROCESSING_QUEUE_SIZE); | |
| return Stream.concat(nonSidecarEntries.stream(), sidecarFileEntries); | |
| } |
| * - `delta.checkpoint-processing.parallelism` | ||
| - Number of threads used for retrieving checkpoint files of each table. Currently, only | ||
| retrievals of V2 Checkpoint's sidecar files are parallelized. | ||
| - Number of threads used for processing Parquet checkpoint files for each |
There was a problem hiding this comment.
thank you for the exhaustive description of the PR
would it be possible to publish as well a set of benchmarks to give an impression to the reviewers on how effective this contribution is?
There was a problem hiding this comment.
Yes, I recognize this patch can't really be fully evaluated with a deeper set of benchmarks! I'm in the process of driving some performance tests internally, and will share results when I have them
Please feel free to hold off on a deep review until those benchmarks are available
| int queueCapacity) | ||
| { | ||
| this.executor = requireNonNull(executor, "executor is null"); | ||
| queue = new DynamicSizeBoundQueue<>(queueCapacity, _ -> 1); |
There was a problem hiding this comment.
Is ArrayBlockingQueue also a suitable alternative?
it stands out _ -> 1 while reading this code.
There was a problem hiding this comment.
Yes, I acknowledge this is a bit weird
The benefit of DynamicSizeBoundQueue is that it supports an infallible, non-blocking forcePut, which allows us to reliably push a completion or error signal through the queue across threads without blocking, failing, or hitting an interrupt. This allows us to keep the consumer side simple, as we can just pull from a single queue without needing to multiple different cross-thread signaling mechanisms (which can get complex and fragile pretty quickly)
ArrayBlockingQueue doesn't support this out of the box, but I think it's likely possible to achieve the same property by building a lightweight wrapper around ArrayBlockingQueue. I'm happy to make attempt that refactor if preferred
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package io.trino.plugin.deltalake.transactionlog.checkpoint; |
There was a problem hiding this comment.
@findepi could you pls review ParallelUnorderedIterator (and associated TestParallelUnorderedIterator) class ? 🙏
Asking because I see similiarities to https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
|
Converting this PR to draft for now. I'd like to manage both reviewer bandwidth and my personal bandwidth more effectively while #28381 remains in progress |
Branched off of #28381, which I anticipate will merge soon. Please ignore the first commit at this time.
Description
Parallelize Parquet checkpoint reads in the Delta Lake connector
Delta uses checkpoints to represent point-in-time snapshots of the table -- notably, the full set of active files. Readers and writers of Delta tables can reason about the current state of the table by loading latest checkpoint and reading all the intervening commit
Most queries and write operations must load the checkpoint either in full or in part before proceeding. For large tables, the checkpoints can grow to be extremely large, so checkpoint loading is important for overall query performance
This PR introduces full support for parallel reads of checkpoint files in Delta tables. We introduce configuration properties,
delta.checkpoint-processing.v{1,2}-parallel-processing.enabled, that enable parallel processing for v1 and v2 checkpoints, respectively. Additionally, we introducedelta.checkpoint-processing.intra-file-parallel-processing.enabled, which enables splitting and parallel processing of individual checkpoint Parquet files, anddelta.checkpoint-processing.intra-file-parallel-processing.split-size, controlling the goal split size for intra-file parallel processing (default 128 MiB). All new boolean configurations default to off to minimize the risk of performance regression or resource exhaustion on upgrade. The existingdelta.checkpoint-processing.parallelismconfiguration properties controls parallelism per tableThe exact shape and degree of parallelism depends on both the configuration and the structure of the checkpoint:
Implementation
ParallelUnorderedIteratorclass representing a parallel stream of several serial sub-iteratorsParallelUnorderedIteratoraccepts aList<Supplier<Stream<T>>>Supplierallows us to defer the work of initializing the stream (e.g.CheckpointEntryIteratorfetches Parquet metadata on construction)ParallelUnorderedIteratorspins out background producer workers using the provided executor. These producers continually pull from their respective iterators and push onto a shared queue, which is pulled from incomputeNexton the consumer threadCheckpointEntryIteratorto accept an offset and a length, thereby supporting scanning a split of a checkpoint file rather than scanning the whole file in fullTableSnapshot#getCheckpointTransactionLogEntriesto support parallel processingCheckpointEntryIteratorinstances into a single serialized stream, pass them into aParallelUnorderedIteratorCheckpointEntryIteratorper splitpublic Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(...), remains identicalAdditional context and related issues
A previous patch introduced a partial solution for parallelism for v2 checkpoint sidecar files. This approach was narrower in scope -- it only parallelized the initialization of the checkpoint iterators, not the actual loading of the checkpoint data. However, we do build on work from that patch, reusing the same executor and the
delta.checkpoint-processing.parallelismconfiguration propertyThe Delta spec does not require any particular ordering of entries within a checkpoint, nor does it mandate that the files constituting a checkpoint be processed in any particular order -- so, the decision to use an unordered iterator should be safe
This PR introduces a tremendous amount of new tests based on existing features, mostly asserting that the parallel and serial code paths produce equivalent results on the same underlying tables. I erred on the side of caution by writing tests that are as exhaustive as possible, even at the cost of redundancy; if we so chose, I'm happy to remove some of these from the PR
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: