diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java new file mode 100644 index 000000000000..639e247357f9 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/BundleLifter.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A PTransform that buffers elements and outputs them to one of two TupleTags based on the total + * size of the bundle in finish_bundle. + * + * @param The type of elements in the input PCollection. + */ +public class BundleLifter extends PTransform, PCollectionTuple> { + + final TupleTag smallBatchTag; + final TupleTag largeBatchTag; + final int threshold; + final SerializableFunction elementSizer; + + /** + * A DoFn that buffers elements within a bundle and outputs them to different tags in + * finish_bundle based on the total bundle size. + * + * @param The type of elements being processed. + */ + static class BundleLiftDoFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class); + + final TupleTag smallBatchTag; + final TupleTag largeBatchTag; + final int threshold; + final SerializableFunction elementSizer; + + private transient @MonotonicNonNull List buffer; + private transient long bundleSizeBytes; + private transient @Nullable MultiOutputReceiver receiver; + + BundleLiftDoFn( + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + @StartBundle + public void startBundle() { + buffer = new ArrayList<>(); + receiver = null; + bundleSizeBytes = 0L; + } + + @ProcessElement + public void processElement(@Element T element, MultiOutputReceiver mor) { + if (receiver == null) { + receiver = mor; + } + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + buffer.add(element); + bundleSizeBytes += elementSizer.apply(element); + } + + @FinishBundle + public void finishBundle() { + checkArgumentNotNull(buffer, "Buffer should be set by startBundle."); + if (buffer.isEmpty()) { + return; + } + + // Select the target tag based on the bundle size + TupleTag targetTag; + targetTag = (bundleSizeBytes < threshold) ? smallBatchTag : largeBatchTag; + LOG.debug( + "Emitting {} elements of {} estimated bytes to tag: '{}'", + buffer.size(), + bundleSizeBytes, + targetTag.getId()); + + checkArgumentNotNull(receiver, "Receiver should be set by startBundle."); + OutputReceiver taggedOutput = receiver.get(targetTag); + + for (T element : buffer) { + taggedOutput.output(element); + } + } + } + + private BundleLifter(TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { + this(smallBatchTag, largeBatchTag, threshold, x -> 1); + } + + private BundleLifter( + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { + if (smallBatchTag == null || largeBatchTag == null) { + throw new IllegalArgumentException("smallBatchTag and largeBatchTag must not be null"); + } + if (smallBatchTag.getId().equals(largeBatchTag.getId())) { + throw new IllegalArgumentException("smallBatchTag and largeBatchTag must be different"); + } + if (threshold <= 0) { + throw new IllegalArgumentException("Threshold must be a positive integer"); + } + + this.smallBatchTag = smallBatchTag; + this.largeBatchTag = largeBatchTag; + this.threshold = threshold; + this.elementSizer = elementSizer; + } + + public static BundleLifter of( + TupleTag smallBatchTag, TupleTag largeBatchTag, int threshold) { + return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold); + } + + public static BundleLifter of( + TupleTag smallBatchTag, + TupleTag largeBatchTag, + int threshold, + SerializableFunction elementSizer) { + return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold, elementSizer); + } + + @Override + public PCollectionTuple expand(PCollection input) { + final TupleTag mainOutputTag = new TupleTag() {}; + + return input.apply( + "BundleLiftDoFn", + ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, elementSizer)) + .withOutputTags(mainOutputTag, TupleTagList.of(smallBatchTag).and(largeBatchTag))); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index 956e45651df7..1d71ad549094 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -395,6 +395,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract @Nullable Duration getTriggeringFrequency(); + abstract @Nullable Integer getDirectWriteByteLimit(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -407,6 +409,8 @@ abstract static class Builder { abstract Builder setTriggeringFrequency(Duration triggeringFrequency); + abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); + abstract WriteRows build(); } @@ -435,6 +439,10 @@ public WriteRows withTriggeringFrequency(Duration triggeringFrequency) { return toBuilder().setTriggeringFrequency(triggeringFrequency).build(); } + public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { + return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build(); + } + @Override public IcebergWriteResult expand(PCollection input) { List allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations()); @@ -451,11 +459,20 @@ public IcebergWriteResult expand(PCollection input) { // Assign destinations before re-windowing to global in WriteToDestinations because // user's dynamic destination may depend on windowing properties + if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) { + Preconditions.checkArgument( + IcebergUtils.isUnbounded(input), + "Must only provide direct write limit for unbounded pipelines."); + } return input .apply("Assign Table Destinations", new AssignDestinations(destinations)) .apply( "Write Rows to Destinations", - new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency())); + new WriteToDestinations( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getDirectWriteByteLimit())); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 0c2bc71c6f8b..f4844db8aa81 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -596,4 +597,12 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } + + static boolean isUnbounded(PCollection input) { + return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED); + } + + static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) { + return directWriteByteLimit != null && directWriteByteLimit >= 0; + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 71c898b00444..428ef71f23e5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -95,6 +95,10 @@ public static Builder builder() { "For a streaming pipeline, sets the frequency at which snapshots are produced.") public abstract @Nullable Integer getTriggeringFrequencySeconds(); + @SchemaFieldDescription( + "For a streaming pipeline, sets the limit for lifting bundles into the direct write path.") + public abstract @Nullable Integer getDirectWriteByteLimit(); + @SchemaFieldDescription( "A list of field names to keep in the input record. All other fields are dropped before writing. " + "Is mutually exclusive with 'drop' and 'only'.") @@ -142,6 +146,8 @@ public abstract static class Builder { public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds); + public abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit); + public abstract Builder setKeep(List keep); public abstract Builder setDrop(List drop); @@ -227,6 +233,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { writeTransform = writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq)); } + Integer directWriteByteLimit = configuration.getDirectWriteByteLimit(); + if (directWriteByteLimit != null) { + writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit); + } + // TODO: support dynamic destinations IcebergWriteResult result = rows.apply(writeTransform); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java new file mode 100644 index 000000000000..8835e2ff628b --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.catalog.Catalog; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +class WriteDirectRowsToFiles + extends PTransform>, PCollection> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private final String filePrefix; + private final long maxBytesPerFile; + + WriteDirectRowsToFiles( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + String filePrefix, + long maxBytesPerFile) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + this.maxBytesPerFile = maxBytesPerFile; + } + + @Override + public PCollection expand(PCollection> input) { + return input.apply( + ParDo.of( + new WriteDirectRowsToFilesDoFn( + catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix))); + } + + private static class WriteDirectRowsToFilesDoFn extends DoFn, FileWriteResult> { + + private final DynamicDestinations dynamicDestinations; + private final IcebergCatalogConfig catalogConfig; + private transient @MonotonicNonNull Catalog catalog; + private final String filePrefix; + private final long maxFileSize; + private transient @Nullable RecordWriterManager recordWriterManager; + + WriteDirectRowsToFilesDoFn( + IcebergCatalogConfig catalogConfig, + DynamicDestinations dynamicDestinations, + long maxFileSize, + String filePrefix) { + this.catalogConfig = catalogConfig; + this.dynamicDestinations = dynamicDestinations; + this.filePrefix = filePrefix; + this.maxFileSize = maxFileSize; + this.recordWriterManager = null; + } + + private org.apache.iceberg.catalog.Catalog getCatalog() { + if (catalog == null) { + this.catalog = catalogConfig.catalog(); + } + return catalog; + } + + @StartBundle + public void startBundle() { + recordWriterManager = + new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE); + } + + @ProcessElement + public void processElement( + ProcessContext context, + @Element KV element, + BoundedWindow window, + PaneInfo paneInfo) + throws Exception { + String tableIdentifier = element.getKey(); + IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); + WindowedValue windowedDestination = + WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); + Preconditions.checkNotNull(recordWriterManager) + .write(windowedDestination, element.getValue()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + if (recordWriterManager == null) { + return; + } + recordWriterManager.close(); + + for (Map.Entry, List> + destinationAndFiles : + Preconditions.checkNotNull(recordWriterManager) + .getSerializableDataFiles() + .entrySet()) { + WindowedValue windowedDestination = destinationAndFiles.getKey(); + + for (SerializableDataFile dataFile : destinationAndFiles.getValue()) { + context.output( + FileWriteResult.builder() + .setSerializableDataFile(dataFile) + .setTableIdentifier(windowedDestination.getValue().getTableIdentifier()) + .build(), + windowedDestination.getTimestamp(), + Iterables.getFirst(windowedDestination.getWindows(), null)); + } + } + recordWriterManager = null; + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 7db1ac426595..12d9570d4a38 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -36,8 +36,7 @@ class WriteGroupedRowsToFiles extends PTransform< PCollection, Iterable>>, PCollection> { - - private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb + private final long maxBytesPerFile; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; @@ -46,10 +45,12 @@ class WriteGroupedRowsToFiles WriteGroupedRowsToFiles( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - String filePrefix) { + String filePrefix, + long maxBytesPerFile) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; this.filePrefix = filePrefix; + this.maxBytesPerFile = maxBytesPerFile; } @Override @@ -58,7 +59,7 @@ public PCollection expand( return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( - catalogConfig, dynamicDestinations, DEFAULT_MAX_BYTES_PER_FILE, filePrefix))); + catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix))); } private static class WriteGroupedRowsToFilesDoFn diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java index fb3bf43f3515..bea84fc826b7 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteToDestinations.java @@ -17,8 +17,11 @@ */ package org.apache.beam.sdk.io.iceberg; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -28,6 +31,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; @@ -36,7 +40,9 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -47,19 +53,22 @@ class WriteToDestinations extends PTransform>, Icebe private static final int FILE_TRIGGERING_RECORD_COUNT = 500_000; // Used for auto-sharding in streaming. Limits total byte size per batch/file public static final int FILE_TRIGGERING_BYTE_COUNT = 1 << 30; // 1GiB - static final int DEFAULT_NUM_FILE_SHARDS = 0; + private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb private final IcebergCatalogConfig catalogConfig; private final DynamicDestinations dynamicDestinations; private final @Nullable Duration triggeringFrequency; private final String filePrefix; + private final @Nullable Integer directWriteByteLimit; WriteToDestinations( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - @Nullable Duration triggeringFrequency) { + @Nullable Duration triggeringFrequency, + @Nullable Integer directWriteByteLimit) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; this.triggeringFrequency = triggeringFrequency; + this.directWriteByteLimit = directWriteByteLimit; // single unique prefix per write transform this.filePrefix = UUID.randomUUID().toString(); } @@ -67,10 +76,15 @@ class WriteToDestinations extends PTransform>, Icebe @Override public IcebergWriteResult expand(PCollection> input) { // Write records to files - PCollection writtenFiles = - input.isBounded().equals(PCollection.IsBounded.UNBOUNDED) - ? writeTriggered(input) - : writeUntriggered(input); + PCollection writtenFiles; + if (IcebergUtils.isUnbounded(input)) { + writtenFiles = + IcebergUtils.validDirectWriteLimit(directWriteByteLimit) + ? writeTriggeredWithBundleLifting(input) + : writeTriggered(input); + } else { + writtenFiles = writeUntriggered(input); + } // Commit files to tables PCollection> snapshots = @@ -79,17 +93,12 @@ public IcebergWriteResult expand(PCollection> input) { return new IcebergWriteResult(input.getPipeline(), snapshots); } - private PCollection writeTriggered(PCollection> input) { - checkArgumentNotNull( - triggeringFrequency, "Streaming pipelines must set a triggering frequency."); - - // Group records into batches to avoid writing thousands of small files + private PCollection groupAndWriteRecords(PCollection> input) { + // We rely on GroupIntoBatches to group and parallelize records properly, + // respecting our thresholds for number of records and bytes per batch. + // Each output batch will be written to a file. PCollection, Iterable>> groupedRecords = input - .apply("WindowIntoGlobal", Window.into(new GlobalWindows())) - // We rely on GroupIntoBatches to group and parallelize records properly, - // respecting our thresholds for number of records and bytes per batch. - // Each output batch will be written to a file. .apply( GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) .withByteSize(FILE_TRIGGERING_BYTE_COUNT) @@ -100,19 +109,72 @@ private PCollection writeTriggered(PCollection> org.apache.beam.sdk.util.ShardedKey.Coder.of(StringUtf8Coder.of()), IterableCoder.of(RowCoder.of(dynamicDestinations.getDataSchema())))); - return groupedRecords - .apply( - "WriteGroupedRows", - new WriteGroupedRowsToFiles(catalogConfig, dynamicDestinations, filePrefix)) - // Respect user's triggering frequency before committing snapshots - .apply( - "ApplyUserTrigger", - Window.into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) - .discardingFiredPanes()); + return groupedRecords.apply( + "WriteGroupedRows", + new WriteGroupedRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); + } + + private PCollection applyUserTriggering(PCollection input) { + return input.apply( + "ApplyUserTrigger", + Window.into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(checkArgumentNotNull(triggeringFrequency)))) + .discardingFiredPanes()); + } + + private PCollection writeTriggeredWithBundleLifting( + PCollection> input) { + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + checkArgumentNotNull( + directWriteByteLimit, "Must set non-null directWriteByteLimit for bundle lifting."); + + final TupleTag> groupedRecordsTag = new TupleTag<>("small_batches"); + final TupleTag> directRecordsTag = new TupleTag<>("large_batches"); + + input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows())); + PCollectionTuple bundleOutputs = + input.apply( + BundleLifter.of( + groupedRecordsTag, directRecordsTag, directWriteByteLimit, new RowSizer())); + + PCollection> smallBatches = + bundleOutputs + .get(groupedRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + PCollection> largeBatches = + bundleOutputs + .get(directRecordsTag) + .setCoder( + KvCoder.of(StringUtf8Coder.of(), RowCoder.of(dynamicDestinations.getDataSchema()))); + + PCollection directFileWrites = + largeBatches.apply( + "WriteDirectRowsToFiles", + new WriteDirectRowsToFiles( + catalogConfig, dynamicDestinations, filePrefix, DEFAULT_MAX_BYTES_PER_FILE)); + + PCollection groupedFileWrites = groupAndWriteRecords(smallBatches); + + PCollection allFileWrites = + PCollectionList.of(groupedFileWrites) + .and(directFileWrites) + .apply(Flatten.pCollections()); + + return applyUserTriggering(allFileWrites); + } + + private PCollection writeTriggered(PCollection> input) { + checkArgumentNotNull( + triggeringFrequency, "Streaming pipelines must set a triggering frequency."); + input = input.apply("WindowIntoGlobal", Window.into(new GlobalWindows())); + PCollection files = groupAndWriteRecords(input); + return applyUserTriggering(files); } private PCollection writeUntriggered(PCollection> input) { @@ -126,7 +188,8 @@ private PCollection writeUntriggered(PCollection writeGroupedResult = @@ -135,10 +198,60 @@ private PCollection writeUntriggered(PCollection, Integer> { + @Override + public Integer apply(KV element) { + return estimateRowSize(element.getValue()); + } + + private int estimateRowSize(Row row) { + if (row == null) { + return 0; + } + int size = 0; + for (Object value : row.getValues()) { + size += estimateObjectSize(value); + } + return size; + } + + private int estimateObjectSize(@Nullable Object value) { + if (value == null) { + return 0; + } + if (value instanceof String) { + return ((String) value).getBytes(UTF_8).length; + } else if (value instanceof byte[]) { + return ((byte[]) value).length; + } else if (value instanceof Row) { + return estimateRowSize((Row) value); + } else if (value instanceof List) { + int listSize = 0; + for (Object item : (List) value) { + listSize += estimateObjectSize(item); + } + return listSize; + } else if (value instanceof Map) { + int mapSize = 0; + for (Map.Entry entry : ((Map) value).entrySet()) { + mapSize += estimateObjectSize(entry.getKey()) + estimateObjectSize(entry.getValue()); + } + return mapSize; + } else { + return 8; // Approximation for other fields + } + } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index bf2a5a3535fb..1db6ede30165 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -65,8 +65,6 @@ class WriteUngroupedRowsToFiles */ @VisibleForTesting static final int DEFAULT_MAX_WRITERS_PER_BUNDLE = 20; - private static final long DEFAULT_MAX_BYTES_PER_FILE = (1L << 29); // 512mb - private static final TupleTag WRITTEN_FILES_TAG = new TupleTag<>("writtenFiles"); private static final TupleTag WRITTEN_ROWS_TAG = new TupleTag("writtenRows") {}; private static final TupleTag, Row>> SPILLED_ROWS_TAG = @@ -75,14 +73,17 @@ class WriteUngroupedRowsToFiles private final String filePrefix; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final long maxBytesPerFile; WriteUngroupedRowsToFiles( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, - String filePrefix) { + String filePrefix, + long maxBytesPerFile) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; this.filePrefix = filePrefix; + this.maxBytesPerFile = maxBytesPerFile; } @Override @@ -96,7 +97,7 @@ public Result expand(PCollection> input) { dynamicDestinations, filePrefix, DEFAULT_MAX_WRITERS_PER_BUNDLE, - DEFAULT_MAX_BYTES_PER_FILE)) + maxBytesPerFile)) .withOutputTags( WRITTEN_FILES_TAG, TupleTagList.of(ImmutableList.of(WRITTEN_ROWS_TAG, SPILLED_ROWS_TAG)))); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java new file mode 100644 index 000000000000..1eaa0920e6c6 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/BundleLifterTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; + +import org.apache.beam.sdk.io.iceberg.BundleLifter.BundleLiftDoFn; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class BundleLifterTest { + + private static final TupleTag INTEGER_SMALL = new TupleTag() {}; + private static final TupleTag INTEGER_LARGE = new TupleTag() {}; + private static final TupleTag STRING_SMALL = new TupleTag() {}; + private static final TupleTag STRING_LARGE = new TupleTag() {}; + + @Test + public void testSmallBundle() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x -> 1)); + + tester.startBundle(); + tester.processElement(1); + tester.processElement(2); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(INTEGER_SMALL), containsInAnyOrder(1, 2)); + assertThat(tester.peekOutputElements(INTEGER_LARGE), empty()); + } + + @Test + public void testLargeBundle() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(INTEGER_SMALL, INTEGER_LARGE, 3, x -> 1)); + + tester.startBundle(); + tester.processElement(1); + tester.processElement(2); + tester.processElement(3); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(INTEGER_SMALL), empty()); + assertThat(tester.peekOutputElements(INTEGER_LARGE), containsInAnyOrder(1, 2, 3)); + } + + @Test + public void testSmallBundleWithSizer() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e -> e.length())); + + tester.startBundle(); + tester.processElement("123"); + tester.processElement("456"); + tester.processElement("789"); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(STRING_SMALL), containsInAnyOrder("123", "456", "789")); + assertThat(tester.peekOutputElements(STRING_LARGE), empty()); + } + + @Test + public void testLargeBundleWithSizer() throws Exception { + DoFnTester tester = + DoFnTester.of(new BundleLiftDoFn<>(STRING_SMALL, STRING_LARGE, 10, e -> e.length())); + + tester.startBundle(); + tester.processElement("123"); + tester.processElement("456"); + tester.processElement("789"); + tester.processElement("0"); + tester.finishBundle(); + + assertThat(tester.peekOutputElements(STRING_SMALL), empty()); + assertThat( + tester.peekOutputElements(STRING_LARGE), containsInAnyOrder("123", "456", "789", "0")); + } +}