Skip to content

Commit 4591122

Browse files
authored
Iceberg direct write (#36720)
* Iceberg direct write * Make RowSizer compatible with Java 11. * Fix some build issues * Minor updates * More efficient encoded string size calculation * Rm extra parenthesis * Write direct rows to files * Address PR feedback * Remove commented import * Mack new Iceberg util methods package private. * Add unit test
1 parent a9e2e68 commit 4591122

File tree

9 files changed

+601
-39
lines changed

9 files changed

+601
-39
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.iceberg;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import org.apache.beam.sdk.transforms.DoFn;
25+
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
26+
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
27+
import org.apache.beam.sdk.transforms.PTransform;
28+
import org.apache.beam.sdk.transforms.ParDo;
29+
import org.apache.beam.sdk.transforms.SerializableFunction;
30+
import org.apache.beam.sdk.values.PCollection;
31+
import org.apache.beam.sdk.values.PCollectionTuple;
32+
import org.apache.beam.sdk.values.TupleTag;
33+
import org.apache.beam.sdk.values.TupleTagList;
34+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
35+
import org.checkerframework.checker.nullness.qual.Nullable;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
/**
40+
* A PTransform that buffers elements and outputs them to one of two TupleTags based on the total
41+
* size of the bundle in finish_bundle.
42+
*
43+
* @param <T> The type of elements in the input PCollection.
44+
*/
45+
public class BundleLifter<T> extends PTransform<PCollection<T>, PCollectionTuple> {
46+
47+
final TupleTag<T> smallBatchTag;
48+
final TupleTag<T> largeBatchTag;
49+
final int threshold;
50+
final SerializableFunction<T, Integer> elementSizer;
51+
52+
/**
53+
* A DoFn that buffers elements within a bundle and outputs them to different tags in
54+
* finish_bundle based on the total bundle size.
55+
*
56+
* @param <T> The type of elements being processed.
57+
*/
58+
static class BundleLiftDoFn<T> extends DoFn<T, Void> {
59+
private static final Logger LOG = LoggerFactory.getLogger(BundleLiftDoFn.class);
60+
61+
final TupleTag<T> smallBatchTag;
62+
final TupleTag<T> largeBatchTag;
63+
final int threshold;
64+
final SerializableFunction<T, Integer> elementSizer;
65+
66+
private transient @MonotonicNonNull List<T> buffer;
67+
private transient long bundleSizeBytes;
68+
private transient @Nullable MultiOutputReceiver receiver;
69+
70+
BundleLiftDoFn(
71+
TupleTag<T> smallBatchTag,
72+
TupleTag<T> largeBatchTag,
73+
int threshold,
74+
SerializableFunction<T, Integer> elementSizer) {
75+
this.smallBatchTag = smallBatchTag;
76+
this.largeBatchTag = largeBatchTag;
77+
this.threshold = threshold;
78+
this.elementSizer = elementSizer;
79+
}
80+
81+
@StartBundle
82+
public void startBundle() {
83+
buffer = new ArrayList<>();
84+
receiver = null;
85+
bundleSizeBytes = 0L;
86+
}
87+
88+
@ProcessElement
89+
public void processElement(@Element T element, MultiOutputReceiver mor) {
90+
if (receiver == null) {
91+
receiver = mor;
92+
}
93+
checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
94+
buffer.add(element);
95+
bundleSizeBytes += elementSizer.apply(element);
96+
}
97+
98+
@FinishBundle
99+
public void finishBundle() {
100+
checkArgumentNotNull(buffer, "Buffer should be set by startBundle.");
101+
if (buffer.isEmpty()) {
102+
return;
103+
}
104+
105+
// Select the target tag based on the bundle size
106+
TupleTag<T> targetTag;
107+
targetTag = (bundleSizeBytes < threshold) ? smallBatchTag : largeBatchTag;
108+
LOG.debug(
109+
"Emitting {} elements of {} estimated bytes to tag: '{}'",
110+
buffer.size(),
111+
bundleSizeBytes,
112+
targetTag.getId());
113+
114+
checkArgumentNotNull(receiver, "Receiver should be set by startBundle.");
115+
OutputReceiver<T> taggedOutput = receiver.get(targetTag);
116+
117+
for (T element : buffer) {
118+
taggedOutput.output(element);
119+
}
120+
}
121+
}
122+
123+
private BundleLifter(TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) {
124+
this(smallBatchTag, largeBatchTag, threshold, x -> 1);
125+
}
126+
127+
private BundleLifter(
128+
TupleTag<T> smallBatchTag,
129+
TupleTag<T> largeBatchTag,
130+
int threshold,
131+
SerializableFunction<T, Integer> elementSizer) {
132+
if (smallBatchTag == null || largeBatchTag == null) {
133+
throw new IllegalArgumentException("smallBatchTag and largeBatchTag must not be null");
134+
}
135+
if (smallBatchTag.getId().equals(largeBatchTag.getId())) {
136+
throw new IllegalArgumentException("smallBatchTag and largeBatchTag must be different");
137+
}
138+
if (threshold <= 0) {
139+
throw new IllegalArgumentException("Threshold must be a positive integer");
140+
}
141+
142+
this.smallBatchTag = smallBatchTag;
143+
this.largeBatchTag = largeBatchTag;
144+
this.threshold = threshold;
145+
this.elementSizer = elementSizer;
146+
}
147+
148+
public static <T> BundleLifter<T> of(
149+
TupleTag<T> smallBatchTag, TupleTag<T> largeBatchTag, int threshold) {
150+
return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold);
151+
}
152+
153+
public static <T> BundleLifter<T> of(
154+
TupleTag<T> smallBatchTag,
155+
TupleTag<T> largeBatchTag,
156+
int threshold,
157+
SerializableFunction<T, Integer> elementSizer) {
158+
return new BundleLifter<>(smallBatchTag, largeBatchTag, threshold, elementSizer);
159+
}
160+
161+
@Override
162+
public PCollectionTuple expand(PCollection<T> input) {
163+
final TupleTag<Void> mainOutputTag = new TupleTag<Void>() {};
164+
165+
return input.apply(
166+
"BundleLiftDoFn",
167+
ParDo.of(new BundleLiftDoFn<>(smallBatchTag, largeBatchTag, threshold, elementSizer))
168+
.withOutputTags(mainOutputTag, TupleTagList.of(smallBatchTag).and(largeBatchTag)));
169+
}
170+
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,8 @@ public abstract static class WriteRows extends PTransform<PCollection<Row>, Iceb
395395

396396
abstract @Nullable Duration getTriggeringFrequency();
397397

398+
abstract @Nullable Integer getDirectWriteByteLimit();
399+
398400
abstract Builder toBuilder();
399401

400402
@AutoValue.Builder
@@ -407,6 +409,8 @@ abstract static class Builder {
407409

408410
abstract Builder setTriggeringFrequency(Duration triggeringFrequency);
409411

412+
abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);
413+
410414
abstract WriteRows build();
411415
}
412416

@@ -435,6 +439,10 @@ public WriteRows withTriggeringFrequency(Duration triggeringFrequency) {
435439
return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
436440
}
437441

442+
public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) {
443+
return toBuilder().setDirectWriteByteLimit(directWriteByteLimit).build();
444+
}
445+
438446
@Override
439447
public IcebergWriteResult expand(PCollection<Row> input) {
440448
List<?> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
@@ -451,11 +459,20 @@ public IcebergWriteResult expand(PCollection<Row> input) {
451459

452460
// Assign destinations before re-windowing to global in WriteToDestinations because
453461
// user's dynamic destination may depend on windowing properties
462+
if (IcebergUtils.validDirectWriteLimit(getDirectWriteByteLimit())) {
463+
Preconditions.checkArgument(
464+
IcebergUtils.isUnbounded(input),
465+
"Must only provide direct write limit for unbounded pipelines.");
466+
}
454467
return input
455468
.apply("Assign Table Destinations", new AssignDestinations(destinations))
456469
.apply(
457470
"Write Rows to Destinations",
458-
new WriteToDestinations(getCatalogConfig(), destinations, getTriggeringFrequency()));
471+
new WriteToDestinations(
472+
getCatalogConfig(),
473+
destinations,
474+
getTriggeringFrequency(),
475+
getDirectWriteByteLimit()));
459476
}
460477
}
461478

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
3939
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
4040
import org.apache.beam.sdk.util.Preconditions;
41+
import org.apache.beam.sdk.values.PCollection;
4142
import org.apache.beam.sdk.values.Row;
4243
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
4344
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -608,4 +609,12 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
608609
// LocalDateTime, LocalDate, LocalTime
609610
return icebergValue;
610611
}
612+
613+
static <T> boolean isUnbounded(PCollection<T> input) {
614+
return input.isBounded().equals(PCollection.IsBounded.UNBOUNDED);
615+
}
616+
617+
static boolean validDirectWriteLimit(@Nullable Integer directWriteByteLimit) {
618+
return directWriteByteLimit != null && directWriteByteLimit >= 0;
619+
}
611620
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ public static Builder builder() {
9595
"For a streaming pipeline, sets the frequency at which snapshots are produced.")
9696
public abstract @Nullable Integer getTriggeringFrequencySeconds();
9797

98+
@SchemaFieldDescription(
99+
"For a streaming pipeline, sets the limit for lifting bundles into the direct write path.")
100+
public abstract @Nullable Integer getDirectWriteByteLimit();
101+
98102
@SchemaFieldDescription(
99103
"A list of field names to keep in the input record. All other fields are dropped before writing. "
100104
+ "Is mutually exclusive with 'drop' and 'only'.")
@@ -142,6 +146,8 @@ public abstract static class Builder {
142146

143147
public abstract Builder setTriggeringFrequencySeconds(Integer triggeringFrequencySeconds);
144148

149+
public abstract Builder setDirectWriteByteLimit(Integer directWriteByteLimit);
150+
145151
public abstract Builder setKeep(List<String> keep);
146152

147153
public abstract Builder setDrop(List<String> drop);
@@ -227,6 +233,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
227233
writeTransform = writeTransform.withTriggeringFrequency(Duration.standardSeconds(trigFreq));
228234
}
229235

236+
Integer directWriteByteLimit = configuration.getDirectWriteByteLimit();
237+
if (directWriteByteLimit != null) {
238+
writeTransform = writeTransform.withDirectWriteByteLimit(directWriteByteLimit);
239+
}
240+
230241
// TODO: support dynamic destinations
231242
IcebergWriteResult result = rows.apply(writeTransform);
232243

0 commit comments

Comments
 (0)