-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Iceberg direct write #36720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Iceberg direct write #36720
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
39606aa
Iceberg direct write
tomstepp ecdbd93
Make RowSizer compatible with Java 11.
tomstepp 4b05177
Fix some build issues
tomstepp f2e9cc1
Minor updates
tomstepp 232b4bc
More efficient encoded string size calculation
tomstepp 1a8e148
Rm extra parenthesis
tomstepp bb4e53e
Write direct rows to files
tomstepp 121c42e
Address PR feedback
tomstepp 6f85a0b
Remove commented import
tomstepp 783ac20
Mack new Iceberg util methods package private.
tomstepp d2cef52
Add unit test
tomstepp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
170 changes: 170 additions & 0 deletions
170
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.sdk.io.iceberg; | ||
|
|
||
| import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import org.apache.beam.sdk.transforms.DoFn; | ||
| import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; | ||
| import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; | ||
| import org.apache.beam.sdk.transforms.PTransform; | ||
| import org.apache.beam.sdk.transforms.ParDo; | ||
| import org.apache.beam.sdk.transforms.SerializableFunction; | ||
| import org.apache.beam.sdk.values.PCollection; | ||
| import org.apache.beam.sdk.values.PCollectionTuple; | ||
| import org.apache.beam.sdk.values.TupleTag; | ||
| import org.apache.beam.sdk.values.TupleTagList; | ||
| import org.checkerframework.checker.nullness.qual.MonotonicNonNull; | ||
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total | ||
| * size of the bundle in finish_bundle. | ||
| * | ||
| * @param <T> The type of elements in the input PCollection. | ||
| */ | ||
| public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> { | ||
|
|
||
| final TupleTag<T> smallBatchTag; | ||
| final TupleTag<T> largeBatchTag; | ||
| final int threshold; | ||
| final SerializableFunction<T, Integer> elementSizer; | ||
|
|
||
| /** | ||
| * A DoFn that buffers elements within a bundle and outputs them to different tags in | ||
| * finish_bundle based on the total bundle size. | ||
| * | ||
| * @param <T> The type of elements being processed. | ||
| */ | ||
| static class BundleLiftDoFn<T> extends DoFn<T, Void> { | ||
| private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); | ||
|
|
||
| final TupleTag<T> smallBatchTag; | ||
| final TupleTag<T> largeBatchTag; | ||
| final int threshold; | ||
| final SerializableFunction<T, Integer> elementSizer; | ||
|
|
||
| private transient @MonotonicNonNull List<T> buffer; | ||
| private transient long bundleSizeBytes; | ||
| private transient @Nullable MultiOutputReceiver receiver; | ||
|
|
||
| BundleLiftDoFn( | ||
| TupleTag<T> smallBatchTag, | ||
| TupleTag<T> largeBatchTag, | ||
| int threshold, | ||
| SerializableFunction<T, Integer> elementSizer) { | ||
| this.smallBatchTag = smallBatchTag; | ||
| this.largeBatchTag = largeBatchTag; | ||
| this.threshold = threshold; | ||
| this.elementSizer = elementSizer; | ||
| } | ||
|
|
||
| @StartBundle | ||
| public void startBundle() { | ||
| buffer = new ArrayList<>(); | ||
| receiver = null; | ||
| bundleSizeBytes = 0L; | ||
| } | ||
|
|
||
| @ProcessElement | ||
| public void processElement(@Element T element, MultiOutputReceiver mor) { | ||
| if (receiver == null) { | ||
| receiver = mor; | ||
| } | ||
| checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); | ||
| buffer.add(element); | ||
| bundleSizeBytes += elementSizer.apply(element); | ||
tomstepp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @FinishBundle | ||
| public void finishBundle() { | ||
| checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); | ||
| if (buffer.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| // Select the target tag based on the bundle size | ||
| TupleTag<T> targetTag; | ||
| targetTag = (bundleSizeBytes < threshold) ? smallBatchTag : largeBatchTag; | ||
| LOG.debug( | ||
| "Emitting {} elements of {} estimated bytes to tag: '{}'", | ||
| buffer.size(), | ||
| bundleSizeBytes, | ||
| targetTag.getId()); | ||
|
|
||
| checkArgumentNotNull(receiver, "Receiver should be set by startBundle."); | ||
| OutputReceiver<T> taggedOutput = receiver.get(targetTag); | ||
|
|
||
| for (T element : buffer) { | ||
| taggedOutput.output(element); | ||
| } | ||
tomstepp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| private BundleLifter(TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) { | ||
| this(smallBatchTag, largeBatchTag, threshold, x -> 1); | ||
| } | ||
|
|
||
| private BundleLifter( | ||
| TupleTag<T> smallBatchTag, | ||
| TupleTag<T> largeBatchTag, | ||
| int threshold, | ||
| SerializableFunction<T, Integer> elementSizer) { | ||
| if (smallBatchTag == null || largeBatchTag == null) { | ||
| throw new IllegalArgumentException("smallBatchTag and largeBatchTag must not be null"); | ||
| } | ||
| if (smallBatchTag.getId().equals(largeBatchTag.getId())) { | ||
| throw new IllegalArgumentException("smallBatchTag and largeBatchTag must be different"); | ||
| } | ||
| if (threshold <= 0) { | ||
| throw new IllegalArgumentException("Threshold must be a positive integer"); | ||
| } | ||
|
|
||
| this.smallBatchTag = smallBatchTag; | ||
| this.largeBatchTag = largeBatchTag; | ||
| this.threshold = threshold; | ||
| this.elementSizer = elementSizer; | ||
| } | ||
|
|
||
| public static <T> BundleLifter<T> of( | ||
| TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) { | ||
| return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold); | ||
| } | ||
|
|
||
| public static <T> BundleLifter<T> of( | ||
| TupleTag<T> smallBatchTag, | ||
| TupleTag<T> largeBatchTag, | ||
| int threshold, | ||
| SerializableFunction<T, Integer> elementSizer) { | ||
| return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold, elementSizer); | ||
| } | ||
|
|
||
| @Override | ||
| public PCollectionTuple expand(PCollection<T> input) { | ||
| final TupleTag<Void> mainOutputTag = new TupleTag<Void>() {}; | ||
|
|
||
| return input.apply( | ||
| "BundleLiftDoFn", | ||
| ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, elementSizer)) | ||
| .withOutputTags(mainOutputTag, TupleTagList.of(smallBatchTag).and(largeBatchTag))); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.