From c6035795eab50a9d4a9f8b92b22505713cd2f41c Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 17 Nov 2025 06:13:49 +0000 Subject: [PATCH 1/2] Improve coder byte calculation methods Makes IntervalWindowCoder::isRegisterByteSizeObserverCheap directly return true. Add getEncodedElementByteSize to PaneInfoCoder Add registerByteSizeObserver to ValueWithRecordIdCoder --- .../transforms/windowing/IntervalWindow.java | 3 +-- .../beam/sdk/transforms/windowing/PaneInfo.java | 17 +++++++++++++++++ .../beam/sdk/values/ValueWithRecordId.java | 11 +++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 98624d54c2e6..99382c60ce11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -186,8 +186,7 @@ public boolean consistentWithEquals() { @Override public boolean isRegisterByteSizeObserverCheap(IntervalWindow value) { - return instantCoder.isRegisterByteSizeObserverCheap(value.end) - && durationCoder.isRegisterByteSizeObserverCheap(new Duration(value.start, value.end)); + return true; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index bc83687bae4e..ac0eeb1009ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -43,6 +43,7 @@ *

Note: This does not uniquely identify a pane, and should not be used for comparisons. */ public final class PaneInfo { + /** * Enumerates the possibilities for the timing of this pane firing related to the input and output * watermarks for its computation. @@ -322,6 +323,7 @@ public String toString() { /** A Coder for encoding PaneInfo instances. */ public static class PaneInfoCoder extends AtomicCoder { + private static final byte ELEMENT_METADATA_MASK = (byte) 0x80; private enum Encoding { @@ -411,5 +413,20 @@ public PaneInfo decode(final InputStream inStream) throws CoderException, IOExce @Override public void verifyDeterministic() {} + + @Override + protected long getEncodedElementByteSize(PaneInfo value) throws Exception { + Encoding encoding = chooseEncoding(value); + switch (encoding) { + case FIRST: + return 1; + case ONE_INDEX: + return 1L + VarInt.getLength(value.index); + case TWO_INDICES: + return 1L + VarInt.getLength(value.index) + VarInt.getLength(value.nonSpeculativeIndex); + default: + throw new CoderException("Unknown encoding " + encoding); + } + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java index 024376691b3f..e7f0536bea9c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -40,6 +41,7 @@ */ @Internal public class ValueWithRecordId { + private final ValueT value; private final byte[] id; @@ -81,6 +83,7 @@ public int hashCode() { /** A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}. */ public static class ValueWithRecordIdCoder extends StructuredCoder> { + public static ValueWithRecordIdCoder of(Coder valueCoder) { return new ValueWithRecordIdCoder<>(valueCoder); } @@ -124,6 +127,13 @@ public void verifyDeterministic() throws NonDeterministicException { valueCoder.verifyDeterministic(); } + @Override + public void registerByteSizeObserver( + ValueWithRecordId value, ElementByteSizeObserver observer) throws Exception { + valueCoder.registerByteSizeObserver(value.getValue(), observer); + idCoder.registerByteSizeObserver(value.getId(), observer); + } + public Coder getValueCoder() { return valueCoder; } @@ -134,6 +144,7 @@ public Coder getValueCoder() { /** {@link DoFn} to turn a {@code ValueWithRecordId} back to the value {@code T}. */ public static class StripIdsDoFn extends DoFn, T> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getValue()); From 686a92f6088293a16280c3e5fa45d846a84bc510 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 2 Dec 2025 22:10:44 +0000 Subject: [PATCH 2/2] address comments --- .../sdk/transforms/windowing/PaneInfo.java | 30 +++++++++---------- .../beam/sdk/values/ValueWithRecordId.java | 6 ++++ .../transforms/windowing/PaneInfoTest.java | 28 +++++++++++++++++ 3 files changed, 49 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index ac0eeb1009ac..f253d1794837 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -387,6 +387,21 @@ public void encode(PaneInfo value, final OutputStream outStream) } } + @Override + protected long getEncodedElementByteSize(PaneInfo value) throws Exception { + Encoding encoding = chooseEncoding(value); + switch (encoding) { + case FIRST: + return 1; + case ONE_INDEX: + return 1L + VarInt.getLength(value.index); + case TWO_INDICES: + return 1L + VarInt.getLength(value.index) + VarInt.getLength(value.nonSpeculativeIndex); + default: + throw new CoderException("Unknown encoding " + encoding); + } + } + @Override public PaneInfo decode(final InputStream inStream) throws CoderException, IOException { byte keyAndTag = (byte) inStream.read(); @@ -413,20 +428,5 @@ public PaneInfo decode(final InputStream inStream) throws CoderException, IOExce @Override public void verifyDeterministic() {} - - @Override - protected long getEncodedElementByteSize(PaneInfo value) throws Exception { - Encoding encoding = chooseEncoding(value); - switch (encoding) { - case FIRST: - return 1; - case ONE_INDEX: - return 1L + VarInt.getLength(value.index); - case TWO_INDICES: - return 1L + VarInt.getLength(value.index) + VarInt.getLength(value.nonSpeculativeIndex); - default: - throw new CoderException("Unknown encoding " + encoding); - } - } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java index e7f0536bea9c..93f2976eaf1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -127,6 +127,12 @@ public void verifyDeterministic() throws NonDeterministicException { valueCoder.verifyDeterministic(); } + @Override + public boolean isRegisterByteSizeObserverCheap(ValueWithRecordId value) { + // idCoder is always cheap + return valueCoder.isRegisterByteSizeObserverCheap(value.value); + } + @Override public void registerByteSizeObserver( ValueWithRecordId value, ElementByteSizeObserver observer) throws Exception { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java index cda8ee1ea55c..e6e904289600 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertSame; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.junit.Test; @@ -52,6 +53,33 @@ public void testEncodingRoundTrip() throws Exception { } } + @Test + public void testByteCount() throws Exception { + Coder coder = PaneInfo.PaneInfoCoder.INSTANCE; + for (Coder.Context context : CoderProperties.ALL_CONTEXTS) { + for (Timing timing : Timing.values()) { + long onTimeIndex = timing == Timing.EARLY ? -1 : 37; + testByteCount(coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex)); + testByteCount(coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex)); + testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0)); + testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0)); + + // With metadata + testByteCount( + coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex, true)); + testByteCount( + coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex, true)); + testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0, true)); + testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0, true)); + } + } + } + + private static void testByteCount(Coder coder, Context context, PaneInfo paneInfo) + throws Exception { + CoderProperties.testByteCount(coder, context, new PaneInfo[] {paneInfo}); + } + @Test public void testEncodingRoundTripWithElementMetadata() throws Exception { Coder coder = PaneInfo.PaneInfoCoder.INSTANCE;