Skip to content

Commit 0f7a189

Browse files
authored
Improve coder byte calculation methods (#36830)
Makes IntervalWindowCoder::isRegisterByteSizeObserverCheap directly return true. Add getEncodedElementByteSize to PaneInfoCoder Add registerByteSizeObserver to ValueWithRecordIdCoder
1 parent 7ca164c commit 0f7a189

File tree

4 files changed

+63
-2
lines changed

4 files changed

+63
-2
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,7 @@ public boolean consistentWithEquals() {
186186

187187
@Override
188188
public boolean isRegisterByteSizeObserverCheap(IntervalWindow value) {
189-
return instantCoder.isRegisterByteSizeObserverCheap(value.end)
190-
&& durationCoder.isRegisterByteSizeObserverCheap(new Duration(value.start, value.end));
189+
return true;
191190
}
192191

193192
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* <p>Note: This does not uniquely identify a pane, and should not be used for comparisons.
4444
*/
4545
public final class PaneInfo {
46+
4647
/**
4748
* Enumerates the possibilities for the timing of this pane firing related to the input and output
4849
* watermarks for its computation.
@@ -322,6 +323,7 @@ public String toString() {
322323

323324
/** A Coder for encoding PaneInfo instances. */
324325
public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {
326+
325327
private static final byte ELEMENT_METADATA_MASK = (byte) 0x80;
326328

327329
private enum Encoding {
@@ -385,6 +387,21 @@ public void encode(PaneInfo value, final OutputStream outStream)
385387
}
386388
}
387389

390+
@Override
391+
protected long getEncodedElementByteSize(PaneInfo value) throws Exception {
392+
Encoding encoding = chooseEncoding(value);
393+
switch (encoding) {
394+
case FIRST:
395+
return 1;
396+
case ONE_INDEX:
397+
return 1L + VarInt.getLength(value.index);
398+
case TWO_INDICES:
399+
return 1L + VarInt.getLength(value.index) + VarInt.getLength(value.nonSpeculativeIndex);
400+
default:
401+
throw new CoderException("Unknown encoding " + encoding);
402+
}
403+
}
404+
388405
@Override
389406
public PaneInfo decode(final InputStream inStream) throws CoderException, IOException {
390407
byte keyAndTag = (byte) inStream.read();

sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.beam.sdk.coders.Coder;
2929
import org.apache.beam.sdk.coders.StructuredCoder;
3030
import org.apache.beam.sdk.transforms.DoFn;
31+
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
3132
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
3233
import org.checkerframework.checker.nullness.qual.Nullable;
3334

@@ -40,6 +41,7 @@
4041
*/
4142
@Internal
4243
public class ValueWithRecordId<ValueT> {
44+
4345
private final ValueT value;
4446
private final byte[] id;
4547

@@ -81,6 +83,7 @@ public int hashCode() {
8183
/** A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}. */
8284
public static class ValueWithRecordIdCoder<ValueT>
8385
extends StructuredCoder<ValueWithRecordId<ValueT>> {
86+
8487
public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
8588
return new ValueWithRecordIdCoder<>(valueCoder);
8689
}
@@ -124,6 +127,19 @@ public void verifyDeterministic() throws NonDeterministicException {
124127
valueCoder.verifyDeterministic();
125128
}
126129

130+
@Override
131+
public boolean isRegisterByteSizeObserverCheap(ValueWithRecordId<ValueT> value) {
132+
// idCoder is always cheap
133+
return valueCoder.isRegisterByteSizeObserverCheap(value.value);
134+
}
135+
136+
@Override
137+
public void registerByteSizeObserver(
138+
ValueWithRecordId<ValueT> value, ElementByteSizeObserver observer) throws Exception {
139+
valueCoder.registerByteSizeObserver(value.getValue(), observer);
140+
idCoder.registerByteSizeObserver(value.getId(), observer);
141+
}
142+
127143
public Coder<ValueT> getValueCoder() {
128144
return valueCoder;
129145
}
@@ -134,6 +150,7 @@ public Coder<ValueT> getValueCoder() {
134150

135151
/** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
136152
public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
153+
137154
@ProcessElement
138155
public void processElement(ProcessContext c) {
139156
c.output(c.element().getValue());

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.junit.Assert.assertSame;
2222

2323
import org.apache.beam.sdk.coders.Coder;
24+
import org.apache.beam.sdk.coders.Coder.Context;
2425
import org.apache.beam.sdk.testing.CoderProperties;
2526
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
2627
import org.junit.Test;
@@ -52,6 +53,33 @@ public void testEncodingRoundTrip() throws Exception {
5253
}
5354
}
5455

56+
@Test
57+
public void testByteCount() throws Exception {
58+
Coder<PaneInfo> coder = PaneInfo.PaneInfoCoder.INSTANCE;
59+
for (Coder.Context context : CoderProperties.ALL_CONTEXTS) {
60+
for (Timing timing : Timing.values()) {
61+
long onTimeIndex = timing == Timing.EARLY ? -1 : 37;
62+
testByteCount(coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex));
63+
testByteCount(coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex));
64+
testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0));
65+
testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0));
66+
67+
// With metadata
68+
testByteCount(
69+
coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex, true));
70+
testByteCount(
71+
coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex, true));
72+
testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0, true));
73+
testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0, true));
74+
}
75+
}
76+
}
77+
78+
private static void testByteCount(Coder<PaneInfo> coder, Context context, PaneInfo paneInfo)
79+
throws Exception {
80+
CoderProperties.testByteCount(coder, context, new PaneInfo[] {paneInfo});
81+
}
82+
5583
@Test
5684
public void testEncodingRoundTripWithElementMetadata() throws Exception {
5785
Coder<PaneInfo> coder = PaneInfo.PaneInfoCoder.INSTANCE;

0 commit comments

Comments
 (0)