From 39606aa7260573256889bb738785c1809ac240d7 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Tue, 4 Nov 2025 07:59:04 -0800 Subject: [PATCH 01/11] Iceberg direct write --- .../beam/sdk/io/iceberg/BundleLifter.java | 169 ++++++++++++++++++ .../apache/beam/sdk/io/iceberg/IcebergIO.java | 11 +- .../IcebergWriteSchemaTransformProvider.java | 13 ++ .../sdk/io/iceberg/WriteToDestinations.java | 125 ++++++++++++- 4 files changed, 311 insertions(+), 7 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java new file mode 100644 index 000000000000..6bea5f00d815 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + *

This is the Java equivalent of the BundleLifter PTransform in Python. + * + * @param The type of elements in the input PCollection. + */ +public class BundleLifter extends PTransform, PCollectionTuple> { + + final TupleTag smallBatchTag; + final TupleTag largeBatchTag; + final int threshold; + final SerializableFunction elementSizer; + + /** + * A private, static DoFn that buffers elements within a bundle and outputs them to different tags + * in finish_bundle based on the total bundle size. + * + *

This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the + * PTransform. + * + * @param The type of elements being processed. + */ + private static class BundleLiftDoFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag smallBatchTag; + final TupleTag largeBatchTag; + final int threshold; + final SerializableFunction elementSizer; + + private transient List buffer; + private transient long bundleSize; + private transient MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + this.buffer = new ArrayList<>(); + this.receiver = null; + this.bundleSize = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (this.receiver == null) { + this.receiver = mor; + } + buffer.add(element); + bundleSize += this.elementSizer.apply(element); + } + + @FinishBundle + public void finishBundle() { + if (buffer.isEmpty() || this.receiver == null) { + return; + } + + TupleTag targetTag; + + // Select the target tag based on the bundle size + if (bundleSize < threshold) { + targetTag = smallBatchTag; + LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, targetTag.getId()); + } else { + targetTag = largeBatchTag; + LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, targetTag.getId()); + } + + OutputReceiver taggedOutput = this.receiver.get(targetTag); + + for (T element : buffer) { + taggedOutput.output(element); + } + } + } + + public BundleLifter(TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { + this(smallBatchTag, largeBatchTag, threshold, x -> 1); + } + + public BundleLifter( + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { + if (smallBatchTag == null || largeBatchTag == null) { + throw new IllegalArgumentException("smallBatchTag and largeBatchTag must not be null"); + } + if (smallBatchTag.getId().equals(largeBatchTag.getId())) { + throw new IllegalArgumentException("smallBatchTag and largeBatchTag must be different"); + } + if (threshold <= 0) { + throw new IllegalArgumentException("Threshold must be a positive integer"); + } + + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + public static BundleLifter of( + TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { + return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold); + } + + public static BundleLifter of( + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { + return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold, elementSizer); + } + + @Override + public PCollectionTuple expand(PCollection input) { + final TupleTag mainOutputTag = new TupleTag() {}; + + return input.apply( + "BundleLiftDoFn", + ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, elementSizer)) + .withOutputTags(mainOutputTag, TupleTagList.of(smallBatchTag).and(largeBatchTag))); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 956e45651df7..eb335e4a0cab 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -395,6 +395,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract @Nullable Duration getTriggeringFrequency(); + abstract @Nullable Integer getDirectWriteByteLimit(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -407,6 +409,8 @@ abstract static class Builder { abstract Builder setTriggeringFrequency(Duration triggeringFrequency); + abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); + abstract WriteRows build(); } @@ -435,6 +439,10 @@ public WriteRows withTriggeringFrequency(Duration triggeringFrequency) { return toBuilder().setTriggeringFrequency(triggeringFrequency).build(); } + public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { + return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build(); + } + @Override public IcebergWriteResult expand(PCollection input) { List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); @@ -455,7 +463,8 @@ public IcebergWriteResult expand(PCollection input) { .apply("Assign Table Destinations", new AssignDestinations(destinations)) .apply( "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); + new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency(), + getDirectWriteByteLimit())); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 71c898b00444..cc2922d03618 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -95,6 +95,10 @@ public static Builder builder() { "For a streaming pipeline, sets the frequency at which snapshots are produced.") public abstract @Nullable Integer getTriggeringFrequencySeconds(); + @SchemaFieldDescription( + "For a streaming pipeline, sets the limit for lifting bundles into the direct write path.") + public abstract @Nullable Integer getDirectWriteByteLimit(); + @SchemaFieldDescription( "A list of field names to keep in the input record. All other fields are dropped before writing. " + "Is mutually exclusive with 'drop' and 'only'.") @@ -142,6 +146,8 @@ public abstract static class Builder { public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + public abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); + public abstract Builder setKeep(List keep); public abstract Builder setDrop(List drop); @@ -227,6 +233,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { writeTransform = writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq)); } + Integer directWriteByteLimit = configuration.getDirectWriteByteLimit(); + if (directWriteByteLimit != null) { + writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit); + } else { + writeTransform = writeTransform.withDirectWriteByteLimit(null); // off + } + // TODO: support dynamic destinations IcebergWriteResult result = rows.apply(writeTransform); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index fb3bf43f3515..441e4d1af4d4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import java.nio.charset.StandardCharsets; import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -28,15 +29,19 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.util.ShardedKey.Coder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -52,14 +57,17 @@ class WriteToDestinations extends PTransform>, Icebe private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; private final String filePrefix; + private final @Nullable Integer directWriteByteLimit; WriteToDestinations( - IcebergCatalogConfig catalogConfig, - DynamicDestinations dynamicDestinations, - @Nullable Duration triggeringFrequency) { + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + @Nullable Duration triggeringFrequency, + @Nullable Integer directWriteByteLimit) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; this.triggeringFrequency = triggeringFrequency; + this.directWriteByteLimit = directWriteByteLimit; // single unique prefix per write transform this.filePrefix = UUID.randomUUID().toString(); } @@ -67,10 +75,15 @@ class WriteToDestinations extends PTransform>, Icebe @Override public IcebergWriteResult expand(PCollection> input) { // Write records to files - PCollection writtenFiles = + PCollection writtenFiles; + if (directWriteByteLimit != null && directWriteByteLimit >= 0) { + writtenFiles = writeTriggeredWithBundleLifting(input); + } else { + writtenFiles = input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) - ? writeTriggered(input) - : writeUntriggered(input); + ? writeTriggered(input) + : writeUntriggered(input); + } // Commit files to tables PCollection> snapshots = @@ -79,6 +92,81 @@ public IcebergWriteResult expand(PCollection> input) { return new IcebergWriteResult(input.getPipeline(), snapshots); } + private PCollection writeTriggeredWithBundleLifting( + PCollection> input) { + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + + // Lift large bundles to separate output stream for direct writes. + // https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs + final TupleTag> groupedRecordsTag = new TupleTag<>("small_batches"); + final TupleTag> directRecordsTag = new TupleTag<>("large_batches"); + + PCollectionTuple bundleOutputs = + input.apply( + BundleLifter.of( + groupedRecordsTag, directRecordsTag, directWriteByteLimit, new RowSizer())); + + PCollection> smallBatches = + bundleOutputs + .get(groupedRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + PCollection> largeBatches = + bundleOutputs + .get(directRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + + // Group records into batches to avoid writing thousands of small files + PCollection, Iterable>> groupedRecords = + smallBatches + .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) + // We rely on GroupIntoBatches to group and parallelize records properly, + // respecting our thresholds for number of records and bytes per batch. + // Each output batch will be written to a file. + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(FILE_TRIGGERING_BYTE_COUNT) + .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) + .withShardedKey()) + .setCoder( + KvCoder.of( + Coder.of(StringUtf8Coder.of()), + IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + + // TODO(tomstepp): handle the spilled rows. Need an ungrouped rows version which doesn't do + // this. + PCollection directFileWrites = + largeBatches + .apply( + "WriteUngroupedRows", + new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) + .getWrittenFiles(); + + PCollection groupedFileWrites = + groupedRecords.apply( + "WriteGroupedRows", + new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + + // Flatten grouped write and direct write files. + // https://beam.apache.org/documentation/pipelines/design-your-pipeline/#merging-pcollections + PCollection allFileWrites = + PCollectionList.of(groupedFileWrites) + .and(directFileWrites) + .apply(Flatten.pCollections()); + + // Respect user's triggering frequency before committing snapshots + return allFileWrites.apply( + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); + } + private PCollection writeTriggered(PCollection> input) { checkArgumentNotNull( triggeringFrequency, "Streaming pipelines must set a triggering frequency."); @@ -141,4 +229,29 @@ private PCollection writeUntriggered(PCollection, Integer> { + @Override + public Integer apply(KV element) { + Row row = element.getValue(); + if (row == null) { + return 0; + } + int size = 0; + for (Object value : row.getValues()) { + if (value instanceof String string) { + size += string.getBytes(StandardCharsets.UTF_8).length; + } else if (value instanceof byte[] array) { + size += array.length; + } else { + size += 8; // Approximation for non-string/byte fields + } + } + return size; + } + } } From ecdbd937ffb0522314714c3b436b77b57ed72e52 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Wed, 5 Nov 2025 13:51:32 -0800 Subject: [PATCH 02/11] Make RowSizer compatible with Java 11. --- .../beam/sdk/io/iceberg/WriteToDestinations.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index 441e4d1af4d4..ef4707ead3b4 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -52,7 +52,6 @@ class WriteToDestinations extends PTransform>, Icebe private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000; // Used for auto-sharding in streaming. Limits total byte size per batch/file public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB - static final int DEFAULT_NUM_FILE_SHARDS = 0; private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; @@ -135,8 +134,7 @@ private PCollection writeTriggeredWithBundleLifting( Coder.of(StringUtf8Coder.of()), IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); - // TODO(tomstepp): handle the spilled rows. Need an ungrouped rows version which doesn't do - // this. + // TODO(tomstepp): Need an ungrouped rows version which doesn't spill rows. PCollection directFileWrites = largeBatches .apply( @@ -238,15 +236,12 @@ private static class RowSizer implements SerializableFunction, I @Override public Integer apply(KV element) { Row row = element.getValue(); - if (row == null) { - return 0; - } int size = 0; for (Object value : row.getValues()) { - if (value instanceof String string) { - size += string.getBytes(StandardCharsets.UTF_8).length; - } else if (value instanceof byte[] array) { - size += array.length; + if (value instanceof String) { + size += ((String) value).getBytes(StandardCharsets.UTF_8).length; + } else if (value instanceof byte[]) { + size += ((byte[]) value).length; } else { size += 8; // Approximation for non-string/byte fields } From 4b051770edaf1403471ccde8d4511d5e82369900 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Wed, 5 Nov 2025 15:29:06 -0800 Subject: [PATCH 03/11] Fix some build issues --- .../beam/sdk/io/iceberg/BundleLifter.java | 41 ++++--- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 7 +- .../IcebergWriteSchemaTransformProvider.java | 4 +- .../sdk/io/iceberg/WriteToDestinations.java | 106 +++++++++--------- 4 files changed, 84 insertions(+), 74 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java index 6bea5f00d815..0ef9bf781eb3 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.transforms.DoFn; @@ -28,6 +30,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,17 +68,20 @@ private static class BundleLiftDoFn extends DoFn { private transient List buffer; private transient long bundleSize; - private transient MultiOutputReceiver receiver; + private transient @Nullable MultiOutputReceiver receiver; BundleLiftDoFn( - TupleTag smallBatchTag, - TupleTag largeBatchTag, - int threshold, - SerializableFunction elementSizer) { + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { this.smallBatchTag = smallBatchTag; this.largeBatchTag = largeBatchTag; this.threshold = threshold; this.elementSizer = elementSizer; + this.buffer = new ArrayList<>(); + this.bundleSize = 0; + this.receiver = null; } @StartBundle @@ -111,6 +117,7 @@ public void finishBundle() { LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, targetTag.getId()); } + checkArgumentNotNull(this.receiver, "Receiver should be set by processElement."); OutputReceiver taggedOutput = this.receiver.get(targetTag); for (T element : buffer) { @@ -124,10 +131,10 @@ public BundleLifter(TupleTag smallBatchTag, TupleTag largeBatchTag, int th } public BundleLifter( - TupleTag smallBatchTag, - TupleTag largeBatchTag, - int threshold, - SerializableFunction elementSizer) { + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { if (smallBatchTag == null || largeBatchTag == null) { throw new IllegalArgumentException("smallBatchTag and largeBatchTag must not be null"); } @@ -145,15 +152,15 @@ public BundleLifter( } public static BundleLifter of( - TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { + TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold); } public static BundleLifter of( - TupleTag smallBatchTag, - TupleTag largeBatchTag, - int threshold, - SerializableFunction elementSizer) { + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold, elementSizer); } @@ -162,8 +169,8 @@ public PCollectionTuple expand(PCollection input) { final TupleTag mainOutputTag = new TupleTag() {}; return input.apply( - "BundleLiftDoFn", - ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, elementSizer)) - .withOutputTags(mainOutputTag, TupleTagList.of(smallBatchTag).and(largeBatchTag))); + "BundleLiftDoFn", + ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, elementSizer)) + .withOutputTags(mainOutputTag, TupleTagList.of(smallBatchTag).and(largeBatchTag))); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index eb335e4a0cab..ee35efb4129c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -463,8 +463,11 @@ public IcebergWriteResult expand(PCollection input) { .apply("Assign Table Destinations", new AssignDestinations(destinations)) .apply( "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency(), - getDirectWriteByteLimit())); + new WriteToDestinations( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getDirectWriteByteLimit())); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index cc2922d03618..428ef71f23e5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -96,7 +96,7 @@ public static Builder builder() { public abstract @Nullable Integer getTriggeringFrequencySeconds(); @SchemaFieldDescription( - "For a streaming pipeline, sets the limit for lifting bundles into the direct write path.") + "For a streaming pipeline, sets the limit for lifting bundles into the direct write path.") public abstract @Nullable Integer getDirectWriteByteLimit(); @SchemaFieldDescription( @@ -236,8 +236,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { Integer directWriteByteLimit = configuration.getDirectWriteByteLimit(); if (directWriteByteLimit != null) { writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit); - } else { - writeTransform = writeTransform.withDirectWriteByteLimit(null); // off } // TODO: support dynamic destinations diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index ef4707ead3b4..4a2fd6c91a71 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -59,10 +59,10 @@ class WriteToDestinations extends PTransform>, Icebe private final @Nullable Integer directWriteByteLimit; WriteToDestinations( - IcebergCatalogConfig catalogConfig, - DynamicDestinations dynamicDestinations, - @Nullable Duration triggeringFrequency, - @Nullable Integer directWriteByteLimit) { + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + @Nullable Duration triggeringFrequency, + @Nullable Integer directWriteByteLimit) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; this.triggeringFrequency = triggeringFrequency; @@ -79,9 +79,9 @@ public IcebergWriteResult expand(PCollection> input) { writtenFiles = writeTriggeredWithBundleLifting(input); } else { writtenFiles = - input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) - ? writeTriggered(input) - : writeUntriggered(input); + input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) + ? writeTriggered(input) + : writeUntriggered(input); } // Commit files to tables @@ -92,9 +92,11 @@ public IcebergWriteResult expand(PCollection> input) { } private PCollection writeTriggeredWithBundleLifting( - PCollection> input) { + PCollection> input) { checkArgumentNotNull( - triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + checkArgumentNotNull( + directWriteByteLimit, "Must set non-null directWriteByteLimit for bundle lifting."); // Lift large bundles to separate output stream for direct writes. // https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs @@ -102,67 +104,67 @@ private PCollection writeTriggeredWithBundleLifting( final TupleTag> directRecordsTag = new TupleTag<>("large_batches"); PCollectionTuple bundleOutputs = - input.apply( - BundleLifter.of( - groupedRecordsTag, directRecordsTag, directWriteByteLimit, new RowSizer())); + input.apply( + BundleLifter.of( + groupedRecordsTag, directRecordsTag, directWriteByteLimit, new RowSizer())); PCollection> smallBatches = - bundleOutputs - .get(groupedRecordsTag) - .setCoder( - KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + bundleOutputs + .get(groupedRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); PCollection> largeBatches = - bundleOutputs - .get(directRecordsTag) - .setCoder( - KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + bundleOutputs + .get(directRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); // Group records into batches to avoid writing thousands of small files PCollection, Iterable>> groupedRecords = - smallBatches - .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) - // We rely on GroupIntoBatches to group and parallelize records properly, - // respecting our thresholds for number of records and bytes per batch. - // Each output batch will be written to a file. - .apply( - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withByteSize(FILE_TRIGGERING_BYTE_COUNT) - .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) - .withShardedKey()) - .setCoder( - KvCoder.of( - Coder.of(StringUtf8Coder.of()), - IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + smallBatches + .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) + // We rely on GroupIntoBatches to group and parallelize records properly, + // respecting our thresholds for number of records and bytes per batch. + // Each output batch will be written to a file. + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(FILE_TRIGGERING_BYTE_COUNT) + .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) + .withShardedKey()) + .setCoder( + KvCoder.of( + Coder.of(StringUtf8Coder.of()), + IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); // TODO(tomstepp): Need an ungrouped rows version which doesn't spill rows. PCollection directFileWrites = - largeBatches - .apply( - "WriteUngroupedRows", - new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) - .getWrittenFiles(); + largeBatches + .apply( + "WriteUngroupedRows", + new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) + .getWrittenFiles(); PCollection groupedFileWrites = - groupedRecords.apply( - "WriteGroupedRows", - new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + groupedRecords.apply( + "WriteGroupedRows", + new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); // Flatten grouped write and direct write files. // https://beam.apache.org/documentation/pipelines/design-your-pipeline/#merging-pcollections PCollection allFileWrites = - PCollectionList.of(groupedFileWrites) - .and(directFileWrites) - .apply(Flatten.pCollections()); + PCollectionList.of(groupedFileWrites) + .and(directFileWrites) + .apply(Flatten.pCollections()); // Respect user's triggering frequency before committing snapshots return allFileWrites.apply( - "ApplyUserTrigger", - Window.into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) - .discardingFiredPanes()); + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); } private PCollection writeTriggered(PCollection> input) { From f2e9cc10727c1e26f33730f411a37146a098160c Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Thu, 6 Nov 2025 17:12:44 -0800 Subject: [PATCH 04/11] Minor updates --- .../beam/sdk/io/iceberg/BundleLifter.java | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java index 0ef9bf781eb3..28cd2e9950fd 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -30,6 +31,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +68,7 @@ private static class BundleLiftDoFn extends DoFn { final int threshold; final SerializableFunction elementSizer; - private transient List buffer; + private transient @MonotonicNonNull List buffer; private transient long bundleSize; private transient @Nullable MultiOutputReceiver receiver; @@ -79,30 +81,29 @@ private static class BundleLiftDoFn extends DoFn { this.largeBatchTag = largeBatchTag; this.threshold = threshold; this.elementSizer = elementSizer; - this.buffer = new ArrayList<>(); - this.bundleSize = 0; - this.receiver = null; } @StartBundle public void startBundle() { - this.buffer = new ArrayList<>(); - this.receiver = null; - this.bundleSize = 0L; + buffer = new ArrayList<>(); + receiver = null; + bundleSize = 0L; } @ProcessElement public void processElement(@Element T element, MultiOutputReceiver mor) { - if (this.receiver == null) { - this.receiver = mor; + if (receiver == null) { + receiver = mor; } + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); buffer.add(element); - bundleSize += this.elementSizer.apply(element); + bundleSize += elementSizer.apply(element); } @FinishBundle public void finishBundle() { - if (buffer.isEmpty() || this.receiver == null) { + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + if (buffer.isEmpty()) { return; } @@ -117,8 +118,8 @@ public void finishBundle() { LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, targetTag.getId()); } - checkArgumentNotNull(this.receiver, "Receiver should be set by processElement."); - OutputReceiver taggedOutput = this.receiver.get(targetTag); + checkArgumentNotNull(receiver, "Receiver should be set by startBundle."); + OutputReceiver taggedOutput = receiver.get(targetTag); for (T element : buffer) { taggedOutput.output(element); From 232b4bc7f98df9fddd88a217af10314093c96bb6 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Thu, 6 Nov 2025 17:42:50 -0800 Subject: [PATCH 05/11] More efficient encoded string size calculation --- .../org/apache/beam/sdk/io/iceberg/WriteToDestinations.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index 4a2fd6c91a71..bd78a01daa64 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -19,7 +19,6 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import java.nio.charset.StandardCharsets; import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -43,6 +42,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -241,7 +241,7 @@ public Integer apply(KV element) { int size = 0; for (Object value : row.getValues()) { if (value instanceof String) { - size += ((String) value).getBytes(StandardCharsets.UTF_8).length; + size += Utf8.encodedLength(((String) value)); } else if (value instanceof byte[]) { size += ((byte[]) value).length; } else { From 1a8e1481bc8295a54655bdd93e0b7bee77adbaba Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Thu, 6 Nov 2025 17:47:31 -0800 Subject: [PATCH 06/11] Rm extra parenthesis --- .../org/apache/beam/sdk/io/iceberg/WriteToDestinations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index bd78a01daa64..aad2c8246b8a 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -241,7 +241,7 @@ public Integer apply(KV element) { int size = 0; for (Object value : row.getValues()) { if (value instanceof String) { - size += Utf8.encodedLength(((String) value)); + size += Utf8.encodedLength((String) value); } else if (value instanceof byte[]) { size += ((byte[]) value).length; } else { From bb4e53eb65ca61152ceb9be63642eb5ecf1217de Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 7 Nov 2025 14:07:13 -0800 Subject: [PATCH 07/11] Write direct rows to files --- .../io/iceberg/WriteDirectRowsToFiles.java | 141 ++++++++++++++++++ .../io/iceberg/WriteGroupedRowsToFiles.java | 9 +- .../sdk/io/iceberg/WriteToDestinations.java | 34 ++--- .../io/iceberg/WriteUngroupedRowsToFiles.java | 9 +- 4 files changed, 165 insertions(+), 28 deletions(-) create mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java new file mode 100644 index 000000000000..8835e2ff628b --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.catalog.Catalog; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +class WriteDirectRowsToFiles + extends PTransform>, PCollection> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; + private final long maxBytesPerFile; + + WriteDirectRowsToFiles( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix, + long maxBytesPerFile) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + this.maxBytesPerFile = maxBytesPerFile; + } + + @Override + public PCollection expand(PCollection> input) { + return input.apply( + ParDo.of( + new WriteDirectRowsToFilesDoFn( + catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix))); + } + + private static class WriteDirectRowsToFilesDoFn extends DoFn, FileWriteResult> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private transient @MonotonicNonNull Catalog catalog; + private final String filePrefix; + private final long maxFileSize; + private transient @Nullable RecordWriterManager recordWriterManager; + + WriteDirectRowsToFilesDoFn( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + long maxFileSize, + String filePrefix) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + this.maxFileSize = maxFileSize; + this.recordWriterManager = null; + } + + private org.apache.iceberg.catalog.Catalog getCatalog() { + if (catalog == null) { + this.catalog = catalogConfig.catalog(); + } + return catalog; + } + + @StartBundle + public void startBundle() { + recordWriterManager = + new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE); + } + + @ProcessElement + public void processElement( + ProcessContext context, + @Element KV element, + BoundedWindow window, + PaneInfo paneInfo) + throws Exception { + String tableIdentifier = element.getKey(); + IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); + WindowedValue windowedDestination = + WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); + Preconditions.checkNotNull(recordWriterManager) + .write(windowedDestination, element.getValue()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + if (recordWriterManager == null) { + return; + } + recordWriterManager.close(); + + for (Map.Entry, List> + destinationAndFiles : + Preconditions.checkNotNull(recordWriterManager) + .getSerializableDataFiles() + .entrySet()) { + WindowedValue windowedDestination = destinationAndFiles.getKey(); + + for (SerializableDataFile dataFile : destinationAndFiles.getValue()) { + context.output( + FileWriteResult.builder() + .setSerializableDataFile(dataFile) + .setTableIdentifier(windowedDestination.getValue().getTableIdentifier()) + .build(), + windowedDestination.getTimestamp(), + Iterables.getFirst(windowedDestination.getWindows(), null)); + } + } + recordWriterManager = null; + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 7db1ac426595..12d9570d4a38 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -36,8 +36,7 @@ class WriteGroupedRowsToFiles extends PTransform< PCollection, Iterable>>, PCollection> { - - private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb + private final long maxBytesPerFile; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -46,10 +45,12 @@ class WriteGroupedRowsToFiles WriteGroupedRowsToFiles( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - String filePrefix) { + String filePrefix, + long maxBytesPerFile) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; this.filePrefix = filePrefix; + this.maxBytesPerFile = maxBytesPerFile; } @Override @@ -58,7 +59,7 @@ public PCollection expand( return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( - catalogConfig, dynamicDestinations, DEFAULT_MAX_BYTES_PER_FILE, filePrefix))); + catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix))); } private static class WriteGroupedRowsToFilesDoFn diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index aad2c8246b8a..76d3cefb0034 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -52,6 +52,7 @@ class WriteToDestinations extends PTransform>, Icebe private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000; // Used for auto-sharding in streaming. Limits total byte size per batch/file public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB + private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; @@ -98,11 +99,10 @@ private PCollection writeTriggeredWithBundleLifting( checkArgumentNotNull( directWriteByteLimit, "Must set non-null directWriteByteLimit for bundle lifting."); - // Lift large bundles to separate output stream for direct writes. - // https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs final TupleTag> groupedRecordsTag = new TupleTag<>("small_batches"); final TupleTag> directRecordsTag = new TupleTag<>("large_batches"); + input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows())); PCollectionTuple bundleOutputs = input.apply( BundleLifter.of( @@ -119,13 +119,8 @@ private PCollection writeTriggeredWithBundleLifting( .setCoder( KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); - // Group records into batches to avoid writing thousands of small files PCollection, Iterable>> groupedRecords = smallBatches - .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) - // We rely on GroupIntoBatches to group and parallelize records properly, - // respecting our thresholds for number of records and bytes per batch. - // Each output batch will be written to a file. .apply( GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) .withByteSize(FILE_TRIGGERING_BYTE_COUNT) @@ -136,27 +131,23 @@ private PCollection writeTriggeredWithBundleLifting( Coder.of(StringUtf8Coder.of()), IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); - // TODO(tomstepp): Need an ungrouped rows version which doesn't spill rows. PCollection directFileWrites = - largeBatches - .apply( - "WriteUngroupedRows", - new WriteUngroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) - .getWrittenFiles(); + largeBatches.apply( + "WriteDirectRowsToFiles", + new WriteDirectRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); PCollection groupedFileWrites = groupedRecords.apply( "WriteGroupedRows", - new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)); + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); - // Flatten grouped write and direct write files. - // https://beam.apache.org/documentation/pipelines/design-your-pipeline/#merging-pcollections PCollection allFileWrites = PCollectionList.of(groupedFileWrites) .and(directFileWrites) .apply(Flatten.pCollections()); - // Respect user's triggering frequency before committing snapshots return allFileWrites.apply( "ApplyUserTrigger", Window.into(new GlobalWindows()) @@ -191,7 +182,8 @@ private PCollection writeTriggered(PCollection> return groupedRecords .apply( "WriteGroupedRows", - new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)) // Respect user's triggering frequency before committing snapshots .apply( "ApplyUserTrigger", @@ -214,7 +206,8 @@ private PCollection writeUntriggered(PCollection writeGroupedResult = @@ -223,7 +216,8 @@ private PCollection writeUntriggered(PCollection WRITTEN_FILES_TAG = new TupleTag<>("writtenFiles"); private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; private static final TupleTag, Row>> SPILLED_ROWS_TAG = @@ -75,14 +73,17 @@ class WriteUngroupedRowsToFiles private final String filePrefix; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final long maxBytesPerFile; WriteUngroupedRowsToFiles( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - String filePrefix) { + String filePrefix, + long maxBytesPerFile) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; this.filePrefix = filePrefix; + this.maxBytesPerFile = maxBytesPerFile; } @Override @@ -96,7 +97,7 @@ public Result expand(PCollection> input) { dynamicDestinations, filePrefix, DEFAULT_MAX_WRITERS_PER_BUNDLE, - DEFAULT_MAX_BYTES_PER_FILE)) + maxBytesPerFile)) .withOutputTags( WRITTEN_FILES_TAG, TupleTagList.of(ImmutableList.of(WRITTEN_ROWS_TAG, SPILLED_ROWS_TAG)))); From 121c42eaab38b875b5110f449506cbd442d1c801 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Thu, 20 Nov 2025 16:48:05 -0800 Subject: [PATCH 08/11] Address PR feedback --- .../beam/sdk/io/iceberg/BundleLifter.java | 31 ++-- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 6 + .../beam/sdk/io/iceberg/IcebergUtils.java | 9 + .../sdk/io/iceberg/WriteToDestinations.java | 155 +++++++++--------- 4 files changed, 109 insertions(+), 92 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java index 28cd2e9950fd..5ceb1c18d696 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java @@ -40,8 +40,6 @@ * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total * size of the bundle in finish_bundle. * - *

This is the Java equivalent of the BundleLifter PTransform in Python. - * * @param The type of elements in the input PCollection. */ public class BundleLifter extends PTransform, PCollectionTuple> { @@ -55,9 +53,6 @@ public class BundleLifter extends PTransform, PCollectionTuple * A private, static DoFn that buffers elements within a bundle and outputs them to different tags * in finish_bundle based on the total bundle size. * - *

This is the Java equivalent of the _BundleLiftDoFn in Python, now merged inside the - * PTransform. - * * @param The type of elements being processed. */ private static class BundleLiftDoFn extends DoFn { @@ -69,7 +64,7 @@ private static class BundleLiftDoFn extends DoFn { final SerializableFunction elementSizer; private transient @MonotonicNonNull List buffer; - private transient long bundleSize; + private transient long bundleSizeBytes; private transient @Nullable MultiOutputReceiver receiver; BundleLiftDoFn( @@ -87,7 +82,7 @@ private static class BundleLiftDoFn extends DoFn { public void startBundle() { buffer = new ArrayList<>(); receiver = null; - bundleSize = 0L; + bundleSizeBytes = 0L; } @ProcessElement @@ -97,7 +92,7 @@ public void processElement(@Element T element, MultiOutputReceiver mor) { } checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); buffer.add(element); - bundleSize += elementSizer.apply(element); + bundleSizeBytes += elementSizer.apply(element); } @FinishBundle @@ -107,16 +102,14 @@ public void finishBundle() { return; } - TupleTag targetTag; - // Select the target tag based on the bundle size - if (bundleSize < threshold) { - targetTag = smallBatchTag; - LOG.debug("Emitting {} elements to small tag: '{}'", bundleSize, targetTag.getId()); - } else { - targetTag = largeBatchTag; - LOG.debug("Emitting {} elements to large tag: '{}'", bundleSize, targetTag.getId()); - } + TupleTag targetTag; + targetTag = (bundleSizeBytes < threshold) ? smallBatchTag : largeBatchTag; + LOG.debug( + "Emitting {} elements of {} estimated bytes to tag: '{}'", + buffer.size(), + bundleSizeBytes, + targetTag.getId()); checkArgumentNotNull(receiver, "Receiver should be set by startBundle."); OutputReceiver taggedOutput = receiver.get(targetTag); @@ -127,11 +120,11 @@ public void finishBundle() { } } - public BundleLifter(TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { + private BundleLifter(TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { this(smallBatchTag, largeBatchTag, threshold, x -> 1); } - public BundleLifter( + private BundleLifter( TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold, diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index ee35efb4129c..114c013efcb8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +// import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; @@ -459,6 +460,11 @@ public IcebergWriteResult expand(PCollection input) { // Assign destinations before re-windowing to global in WriteToDestinations because // user's dynamic destination may depend on windowing properties + if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) { + Preconditions.checkArgument( + IcebergUtils.isUnbounded(input), + "Must only provide direct write limit for unbounded pipelines."); + } return input .apply("Assign Table Destinations", new AssignDestinations(destinations)) .apply( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 0c2bc71c6f8b..c56635e68c5b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -596,4 +597,12 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } + + public static boolean isUnbounded(PCollection input) { + return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED); + } + + public static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) { + return directWriteByteLimit != null && directWriteByteLimit >= 0; + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index 76d3cefb0034..bea84fc826b7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -17,8 +17,11 @@ */ package org.apache.beam.sdk.io.iceberg; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -34,7 +37,6 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; -import org.apache.beam.sdk.util.ShardedKey.Coder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -42,7 +44,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -76,13 +77,13 @@ class WriteToDestinations extends PTransform>, Icebe public IcebergWriteResult expand(PCollection> input) { // Write records to files PCollection writtenFiles; - if (directWriteByteLimit != null && directWriteByteLimit >= 0) { - writtenFiles = writeTriggeredWithBundleLifting(input); - } else { + if (IcebergUtils.isUnbounded(input)) { writtenFiles = - input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) - ? writeTriggered(input) - : writeUntriggered(input); + IcebergUtils.validDirectWriteLimit(directWriteByteLimit) + ? writeTriggeredWithBundleLifting(input) + : writeTriggered(input); + } else { + writtenFiles = writeUntriggered(input); } // Commit files to tables @@ -92,6 +93,39 @@ public IcebergWriteResult expand(PCollection> input) { return new IcebergWriteResult(input.getPipeline(), snapshots); } + private PCollection groupAndWriteRecords(PCollection> input) { + // We rely on GroupIntoBatches to group and parallelize records properly, + // respecting our thresholds for number of records and bytes per batch. + // Each output batch will be written to a file. + PCollection, Iterable>> groupedRecords = + input + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(FILE_TRIGGERING_BYTE_COUNT) + .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) + .withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()), + IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); + + return groupedRecords.apply( + "WriteGroupedRows", + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); + } + + private PCollection applyUserTriggering(PCollection input) { + return input.apply( + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); + } + private PCollection writeTriggeredWithBundleLifting( PCollection> input) { checkArgumentNotNull( @@ -119,80 +153,28 @@ private PCollection writeTriggeredWithBundleLifting( .setCoder( KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); - PCollection, Iterable>> groupedRecords = - smallBatches - .apply( - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withByteSize(FILE_TRIGGERING_BYTE_COUNT) - .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) - .withShardedKey()) - .setCoder( - KvCoder.of( - Coder.of(StringUtf8Coder.of()), - IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); - PCollection directFileWrites = largeBatches.apply( "WriteDirectRowsToFiles", new WriteDirectRowsToFiles( catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); - PCollection groupedFileWrites = - groupedRecords.apply( - "WriteGroupedRows", - new WriteGroupedRowsToFiles( - catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); + PCollection groupedFileWrites = groupAndWriteRecords(smallBatches); PCollection allFileWrites = PCollectionList.of(groupedFileWrites) .and(directFileWrites) .apply(Flatten.pCollections()); - return allFileWrites.apply( - "ApplyUserTrigger", - Window.into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) - .discardingFiredPanes()); + return applyUserTriggering(allFileWrites); } private PCollection writeTriggered(PCollection> input) { checkArgumentNotNull( triggeringFrequency, "Streaming pipelines must set a triggering frequency."); - - // Group records into batches to avoid writing thousands of small files - PCollection, Iterable>> groupedRecords = - input - .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) - // We rely on GroupIntoBatches to group and parallelize records properly, - // respecting our thresholds for number of records and bytes per batch. - // Each output batch will be written to a file. - .apply( - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withByteSize(FILE_TRIGGERING_BYTE_COUNT) - .withMaxBufferingDuration(checkArgumentNotNull(triggeringFrequency)) - .withShardedKey()) - .setCoder( - KvCoder.of( - org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()), - IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); - - return groupedRecords - .apply( - "WriteGroupedRows", - new WriteGroupedRowsToFiles( - catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)) - // Respect user's triggering frequency before committing snapshots - .apply( - "ApplyUserTrigger", - Window.into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) - .discardingFiredPanes()); + input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows())); + PCollection files = groupAndWriteRecords(input); + return applyUserTriggering(files); } private PCollection writeUntriggered(PCollection> input) { @@ -231,18 +213,45 @@ private PCollection writeUntriggered(PCollection, Integer> { @Override public Integer apply(KV element) { - Row row = element.getValue(); + return estimateRowSize(element.getValue()); + } + + private int estimateRowSize(Row row) { + if (row == null) { + return 0; + } int size = 0; for (Object value : row.getValues()) { - if (value instanceof String) { - size += Utf8.encodedLength((String) value); - } else if (value instanceof byte[]) { - size += ((byte[]) value).length; - } else { - size += 8; // Approximation for non-string/byte fields - } + size += estimateObjectSize(value); } return size; } + + private int estimateObjectSize(@Nullable Object value) { + if (value == null) { + return 0; + } + if (value instanceof String) { + return ((String) value).getBytes(UTF_8).length; + } else if (value instanceof byte[]) { + return ((byte[]) value).length; + } else if (value instanceof Row) { + return estimateRowSize((Row) value); + } else if (value instanceof List) { + int listSize = 0; + for (Object item : (List) value) { + listSize += estimateObjectSize(item); + } + return listSize; + } else if (value instanceof Map) { + int mapSize = 0; + for (Map.Entry entry : ((Map) value).entrySet()) { + mapSize += estimateObjectSize(entry.getKey()) + estimateObjectSize(entry.getValue()); + } + return mapSize; + } else { + return 8; // Approximation for other fields + } + } } } From 6f85a0b7d88edfb0e2faf39fc3b2b0588129cb71 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Thu, 20 Nov 2025 19:03:31 -0800 Subject: [PATCH 09/11] Remove commented import --- .../src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 114c013efcb8..1d71ad549094 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.iceberg; -// import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import com.google.auto.value.AutoValue; From 783ac20d6ff9ef7b852284bcd679aba94fbfd3f7 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Fri, 21 Nov 2025 08:41:19 -0800 Subject: [PATCH 10/11] Mack new Iceberg util methods package private. --- .../java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index c56635e68c5b..f4844db8aa81 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -598,11 +598,11 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType return icebergValue; } - public static boolean isUnbounded(PCollection input) { + static boolean isUnbounded(PCollection input) { return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED); } - public static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) { + static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) { return directWriteByteLimit != null && directWriteByteLimit >= 0; } } From d2cef52bcfa238d235309f10a7e7a2edf2133e1d Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Mon, 24 Nov 2025 13:52:23 -0800 Subject: [PATCH 11/11] Add unit test --- .../beam/sdk/io/iceberg/BundleLifter.java | 6 +- .../beam/sdk/io/iceberg/BundleLifterTest.java | 99 +++++++++++++++++++ 2 files changed, 102 insertions(+), 3 deletions(-) create mode 100644 sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java index 5ceb1c18d696..639e247357f9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java @@ -50,12 +50,12 @@ public class BundleLifter extends PTransform, PCollectionTuple final SerializableFunction elementSizer; /** - * A private, static DoFn that buffers elements within a bundle and outputs them to different tags - * in finish_bundle based on the total bundle size. + * A DoFn that buffers elements within a bundle and outputs them to different tags in + * finish_bundle based on the total bundle size. * * @param The type of elements being processed. */ - private static class BundleLiftDoFn extends DoFn { + static class BundleLiftDoFn extends DoFn { private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); final TupleTag smallBatchTag; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java new file mode 100644 index 000000000000..1eaa0920e6c6 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; + +import org.apache.beam.sdk.io.iceberg.BundleLifter.BundleLiftDoFn; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BundleLifterTest { + + private static final TupleTag INTEGER_SMALL = new TupleTag() {}; + private static final TupleTag INTEGER_LARGE = new TupleTag() {}; + private static final TupleTag STRING_SMALL = new TupleTag() {}; + private static final TupleTag STRING_LARGE = new TupleTag() {}; + + @Test + public void testSmallBundle() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x -> 1)); + + tester.startBundle(); + tester.processElement(1); + tester.processElement(2); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(INTEGER_SMALL), containsInAnyOrder(1, 2)); + assertThat(tester.peekOutputElements(INTEGER_LARGE), empty()); + } + + @Test + public void testLargeBundle() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x -> 1)); + + tester.startBundle(); + tester.processElement(1); + tester.processElement(2); + tester.processElement(3); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(INTEGER_SMALL), empty()); + assertThat(tester.peekOutputElements(INTEGER_LARGE), containsInAnyOrder(1, 2, 3)); + } + + @Test + public void testSmallBundleWithSizer() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e -> e.length())); + + tester.startBundle(); + tester.processElement("123"); + tester.processElement("456"); + tester.processElement("789"); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(STRING_SMALL), containsInAnyOrder("123", "456", "789")); + assertThat(tester.peekOutputElements(STRING_LARGE), empty()); + } + + @Test + public void testLargeBundleWithSizer() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e -> e.length())); + + tester.startBundle(); + tester.processElement("123"); + tester.processElement("456"); + tester.processElement("789"); + tester.processElement("0"); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(STRING_SMALL), empty()); + assertThat( + tester.peekOutputElements(STRING_LARGE), containsInAnyOrder("123", "456", "789", "0")); + } +}