diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index e600e34657e7..f73f34a500e1 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1677,6 +1677,9 @@ message StandardProtocols { // Indicates whether the SDK supports ordered list state. ORDERED_LIST_STATE = 10 [(beam_urn) = "beam:protocol:ordered_list_state:v1"]; + + EXTENDED_ELEMENT_METADATA = 11 + [(beam_urn) = "beam:protocol:extended_element_metadata:v1"]; } } @@ -2032,3 +2035,14 @@ message StandardResourceHints { MAX_ACTIVE_BUNDLES_PER_WORKER = 3 [(beam_urn) = "beam:resources:max_active_bundles_per_worker:v1"]; } } +message DrainMode { + enum Enum { + DRAIN_MODE_UNSPECIFIED = 0; + DRAIN_MODE_NOT_DRAINING = 1; + DRAIN_MODE_DRAINING = 2; + } +} + +message ElementMetadata { + optional DrainMode.Enum drain = 1; +} \ No newline at end of file diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 6af6c499cfad..04cc7a109213 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -63,6 +63,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.ElementMetadata; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -1364,9 +1365,16 @@ private static WindowedValue valueInEmptyWindows(T value) { private static class ValueInEmptyWindows extends WindowedValue { private final T value; + private final @Nullable ElementMetadata elementMetadata; private ValueInEmptyWindows(T value) { this.value = value; + this.elementMetadata = null; + } + + private ValueInEmptyWindows(T value, @Nullable ElementMetadata elementMetadata) { + this.value = value; + this.elementMetadata = elementMetadata; } @Override @@ -1374,6 +1382,11 @@ public WindowedValue withValue(NewT value) { return new ValueInEmptyWindows<>(value); } + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new ValueInEmptyWindows<>(this.getValue(), elementMetadata); + } + @Override public T getValue() { return value; @@ -1394,6 +1407,11 @@ public PaneInfo getPane() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable ElementMetadata getElementMetadata() { + return elementMetadata; + } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()).add("value", getValue()).toString(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java index 5084af0d187e..3ffadb1dd72a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java @@ -22,6 +22,7 @@ import java.util.Objects; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.ElementMetadata; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -39,9 +40,15 @@ */ public class ValueInEmptyWindows extends WindowedValue { private final T value; + private final @Nullable ElementMetadata elementMetadata; public ValueInEmptyWindows(T value) { + this(value, null); + } + + public ValueInEmptyWindows(T value, @Nullable ElementMetadata elementMetadata) { this.value = value; + this.elementMetadata = elementMetadata; } @Override @@ -49,6 +56,11 @@ public PaneInfo getPane() { return PaneInfo.NO_FIRING; } + @Override + public @Nullable ElementMetadata getElementMetadata() { + return elementMetadata; + } + @Override public T getValue() { return value; @@ -59,6 +71,11 @@ public WindowedValue withValue(NewT newValue) { return new ValueInEmptyWindows<>(newValue); } + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new ValueInEmptyWindows<>(this.getValue(), elementMetadata); + } + @Override public Instant getTimestamp() { return BoundedWindow.TIMESTAMP_MIN_VALUE; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 6e4c694d48e3..2083f9794d4a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -146,10 +146,10 @@ private static byte encodedByte(boolean isFirst, boolean isLast, Timing timing) ImmutableMap.Builder decodingBuilder = ImmutableMap.builder(); for (Timing timing : Timing.values()) { long onTimeIndex = timing == Timing.EARLY ? -1 : 0; - register(decodingBuilder, new PaneInfo(true, true, timing, 0, onTimeIndex)); - register(decodingBuilder, new PaneInfo(true, false, timing, 0, onTimeIndex)); - register(decodingBuilder, new PaneInfo(false, true, timing, -1, onTimeIndex)); - register(decodingBuilder, new PaneInfo(false, false, timing, -1, onTimeIndex)); + register(decodingBuilder, new PaneInfo(true, true, timing, 0, onTimeIndex, false)); + register(decodingBuilder, new PaneInfo(true, false, timing, 0, onTimeIndex, false)); + register(decodingBuilder, new PaneInfo(false, true, timing, -1, onTimeIndex, false)); + register(decodingBuilder, new PaneInfo(false, false, timing, -1, onTimeIndex, false)); } BYTE_TO_PANE_INFO = decodingBuilder.build(); } @@ -159,7 +159,7 @@ private static void register(ImmutableMap.Builder builder, PaneI } private final byte encodedByte; - + private final boolean containsElementMetadata; private final boolean isFirst; private final boolean isLast; private final Timing timing; @@ -177,13 +177,20 @@ private static void register(ImmutableMap.Builder builder, PaneI public static final PaneInfo ON_TIME_AND_ONLY_FIRING = PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0); - private PaneInfo(boolean isFirst, boolean isLast, Timing timing, long index, long onTimeIndex) { + private PaneInfo( + boolean isFirst, + boolean isLast, + Timing timing, + long index, + long onTimeIndex, + boolean containsElementMetadata) { this.encodedByte = encodedByte(isFirst, isLast, timing); this.isFirst = isFirst; this.isLast = isLast; this.timing = timing; this.index = index; this.nonSpeculativeIndex = onTimeIndex; + this.containsElementMetadata = containsElementMetadata; } public static PaneInfo createPane(boolean isFirst, boolean isLast, Timing timing) { @@ -194,10 +201,21 @@ public static PaneInfo createPane(boolean isFirst, boolean isLast, Timing timing /** Factory method to create a {@link PaneInfo} with the specified parameters. */ public static PaneInfo createPane( boolean isFirst, boolean isLast, Timing timing, long index, long onTimeIndex) { + return createPane(isFirst, isLast, timing, index, onTimeIndex, false); + } + + /** Factory method to create a {@link PaneInfo} with the specified parameters. */ + public static PaneInfo createPane( + boolean isFirst, + boolean isLast, + Timing timing, + long index, + long onTimeIndex, + boolean containsElementMetadata) { if (isFirst || timing == Timing.UNKNOWN) { return checkNotNull(BYTE_TO_PANE_INFO.get(encodedByte(isFirst, isLast, timing))); } else { - return new PaneInfo(isFirst, isLast, timing, index, onTimeIndex); + return new PaneInfo(isFirst, isLast, timing, index, onTimeIndex, containsElementMetadata); } } @@ -241,6 +259,15 @@ public long getIndex() { return index; } + public boolean isElementMetadata() { + return containsElementMetadata; + } + + public PaneInfo withElementMetadata(boolean elementMetadata) { + return new PaneInfo( + this.isFirst, this.isLast, this.timing, index, nonSpeculativeIndex, elementMetadata); + } + /** * The zero-based index of this trigger firing among non-speculative panes. * @@ -295,6 +322,8 @@ public String toString() { /** A Coder for encoding PaneInfo instances. */ public static class PaneInfoCoder extends AtomicCoder { + private static final byte ELEMENT_METADATA_MASK = (byte) 0x80; + private enum Encoding { FIRST, ONE_INDEX, @@ -337,16 +366,17 @@ private PaneInfoCoder() {} public void encode(PaneInfo value, final OutputStream outStream) throws CoderException, IOException { Encoding encoding = chooseEncoding(value); + byte elementMetadata = value.containsElementMetadata ? ELEMENT_METADATA_MASK : 0x00; switch (chooseEncoding(value)) { case FIRST: - outStream.write(value.encodedByte); + outStream.write(value.encodedByte | elementMetadata); break; case ONE_INDEX: - outStream.write(value.encodedByte | encoding.tag); + outStream.write(value.encodedByte | encoding.tag | elementMetadata); VarInt.encode(value.index, outStream); break; case TWO_INDICES: - outStream.write(value.encodedByte | encoding.tag); + outStream.write(value.encodedByte | encoding.tag | elementMetadata); VarInt.encode(value.index, outStream); VarInt.encode(value.nonSpeculativeIndex, outStream); break; @@ -360,9 +390,10 @@ public PaneInfo decode(final InputStream inStream) throws CoderException, IOExce byte keyAndTag = (byte) inStream.read(); PaneInfo base = Preconditions.checkNotNull(BYTE_TO_PANE_INFO.get((byte) (keyAndTag & 0x0F))); long index, onTimeIndex; - switch (Encoding.fromTag(keyAndTag)) { + boolean elementMetadata = (keyAndTag & ELEMENT_METADATA_MASK) != 0; + switch (Encoding.fromTag((byte) (keyAndTag & ~ELEMENT_METADATA_MASK))) { case FIRST: - return base; + return base.withElementMetadata(elementMetadata); case ONE_INDEX: index = VarInt.decodeLong(inStream); onTimeIndex = base.timing == Timing.EARLY ? -1 : index; @@ -374,7 +405,8 @@ public PaneInfo decode(final InputStream inStream) throws CoderException, IOExce default: throw new CoderException("Unknown encoding " + (keyAndTag & 0xF0)); } - return new PaneInfo(base.isFirst, base.isLast, base.timing, index, onTimeIndex); + return new PaneInfo( + base.isFirst, base.isLast, base.timing, index, onTimeIndex, elementMetadata); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ElementMetadata.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ElementMetadata.java new file mode 100644 index 000000000000..f162c79eeaca --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ElementMetadata.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.value.AutoValue; +import org.apache.beam.model.pipeline.v1.RunnerApi.DrainMode; +import org.checkerframework.checker.nullness.qual.Nullable; + +@AutoValue +public abstract class ElementMetadata { + + DrainMode.@Nullable Enum drainMode; + + public static ElementMetadata create(DrainMode.@Nullable Enum drainMode) { + return new AutoValue_ElementMetadata(drainMode); + } + + abstract DrainMode.@Nullable Enum drainMode(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index d11166001f05..8ae7ae26cb10 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -61,40 +62,65 @@ public abstract class WindowedValue { /** Returns a {@code WindowedValue} with the given value, timestamp, and windows. */ public static WindowedValue of( - T value, Instant timestamp, Collection windows, PaneInfo pane) { + T value, + Instant timestamp, + Collection windows, + PaneInfo pane, + @Nullable ElementMetadata elementMetadata) { checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null"); checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), pane); + return of(value, timestamp, windows.iterator().next(), pane, elementMetadata); } else { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); + return new TimestampedValueInMultipleWindows<>( + value, timestamp, windows, pane, elementMetadata); } } + public static WindowedValue of( + T value, Instant timestamp, Collection windows, PaneInfo pane) { + return of(value, timestamp, windows, pane, null); + } + /** @deprecated for use only in compatibility with old broken code */ @Deprecated static WindowedValue createWithoutValidation( - T value, Instant timestamp, Collection windows, PaneInfo pane) { + T value, + Instant timestamp, + Collection windows, + PaneInfo pane, + @Nullable ElementMetadata elementMetadata) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), pane); + return of(value, timestamp, windows.iterator().next(), pane, elementMetadata); } else { - return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); + return new TimestampedValueInMultipleWindows<>( + value, timestamp, windows, pane, elementMetadata); } } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo pane) { + return of(value, timestamp, window, pane, null); + } + + public static WindowedValue of( + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo pane, + @Nullable ElementMetadata elementMetadata) { + checkArgument(pane != null, "WindowedValue requires PaneInfo, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { return valueInGlobalWindow(value, pane); } else if (isGlobal) { - return new TimestampedValueInGlobalWindow<>(value, timestamp, pane); + return new TimestampedValueInGlobalWindow<>(value, timestamp, pane, elementMetadata); } else { - return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane); + return new TimestampedValueInSingleWindow<>(value, timestamp, window, pane, elementMetadata); } } @@ -145,6 +171,8 @@ public static WindowedValue timestampedValueInGlobalWindow( */ public abstract WindowedValue withValue(NewT value); + public abstract WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata); + /** Returns the value of this {@code WindowedValue}. */ public abstract T getValue(); @@ -157,6 +185,8 @@ public static WindowedValue timestampedValueInGlobalWindow( /** Returns the pane of this {@code WindowedValue} in its window. */ public abstract PaneInfo getPane(); + public abstract @Nullable ElementMetadata getElementMetadata(); + /** Returns {@code true} if this WindowedValue has exactly one window. */ public boolean isSingleWindowedValue() { return false; @@ -221,10 +251,13 @@ private abstract static class SimpleWindowedValue extends WindowedValue { private final T value; private final PaneInfo pane; + private final @Nullable ElementMetadata elementMetadata; - protected SimpleWindowedValue(T value, PaneInfo pane) { + protected SimpleWindowedValue( + T value, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { this.value = value; this.pane = checkNotNull(pane); + this.elementMetadata = elementMetadata; } @Override @@ -232,6 +265,11 @@ public PaneInfo getPane() { return pane; } + @Override + public @Nullable ElementMetadata getElementMetadata() { + return elementMetadata; + } + @Override public T getValue() { return value; @@ -240,8 +278,9 @@ public T getValue() { /** The abstract superclass of WindowedValue representations where timestamp == MIN. */ private abstract static class MinTimestampWindowedValue extends SimpleWindowedValue { - public MinTimestampWindowedValue(T value, PaneInfo pane) { - super(value, pane); + public MinTimestampWindowedValue( + T value, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { + super(value, pane, elementMetadata); } @Override @@ -254,13 +293,22 @@ public Instant getTimestamp() { private static class ValueInGlobalWindow extends MinTimestampWindowedValue implements SingleWindowedValue { + public ValueInGlobalWindow(T value, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { + super(value, pane, elementMetadata); + } + public ValueInGlobalWindow(T value, PaneInfo pane) { - super(value, pane); + this(value, pane, null); } @Override public WindowedValue withValue(NewT newValue) { - return new ValueInGlobalWindow<>(newValue, getPane()); + return new ValueInGlobalWindow<>(newValue, getPane(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new ValueInGlobalWindow<>(getValue(), getPane(), elementMetadata); } @Override @@ -307,8 +355,9 @@ public String toString() { private abstract static class TimestampedWindowedValue extends SimpleWindowedValue { private final Instant timestamp; - public TimestampedWindowedValue(T value, Instant timestamp, PaneInfo pane) { - super(value, pane); + public TimestampedWindowedValue( + T value, Instant timestamp, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { + super(value, pane, elementMetadata); this.timestamp = checkNotNull(timestamp); } @@ -325,13 +374,25 @@ public Instant getTimestamp() { private static class TimestampedValueInGlobalWindow extends TimestampedWindowedValue implements SingleWindowedValue { + public TimestampedValueInGlobalWindow( + T value, Instant timestamp, PaneInfo pane, @Nullable ElementMetadata elementMetadata) { + super(value, timestamp, pane, elementMetadata); + } + public TimestampedValueInGlobalWindow(T value, Instant timestamp, PaneInfo pane) { - super(value, timestamp, pane); + this(value, timestamp, pane, null); } @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInGlobalWindow<>(newValue, getTimestamp(), getPane()); + return new TimestampedValueInGlobalWindow<>( + newValue, getTimestamp(), getPane(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new TimestampedValueInGlobalWindow<>( + getValue(), getTimestamp(), getPane(), elementMetadata); } @Override @@ -390,14 +451,25 @@ private static class TimestampedValueInSingleWindow extends TimestampedWindow private final BoundedWindow window; public TimestampedValueInSingleWindow( - T value, Instant timestamp, BoundedWindow window, PaneInfo pane) { - super(value, timestamp, pane); + T value, + Instant timestamp, + BoundedWindow window, + PaneInfo pane, + @Nullable ElementMetadata elementMetadata) { + super(value, timestamp, pane, elementMetadata); this.window = checkNotNull(window); } @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInSingleWindow<>(newValue, getTimestamp(), window, getPane()); + return new TimestampedValueInSingleWindow<>( + newValue, getTimestamp(), window, getPane(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new TimestampedValueInSingleWindow<>( + getValue(), getTimestamp(), getWindow(), getPane(), elementMetadata); } @Override @@ -453,14 +525,25 @@ private static class TimestampedValueInMultipleWindows extends TimestampedWin private Collection windows; public TimestampedValueInMultipleWindows( - T value, Instant timestamp, Collection windows, PaneInfo pane) { - super(value, timestamp, pane); + T value, + Instant timestamp, + Collection windows, + PaneInfo pane, + @Nullable ElementMetadata elementMetadata) { + super(value, timestamp, pane, elementMetadata); this.windows = checkNotNull(windows); } @Override public WindowedValue withValue(NewT newValue) { - return new TimestampedValueInMultipleWindows<>(newValue, getTimestamp(), windows, getPane()); + return new TimestampedValueInMultipleWindows<>( + newValue, getTimestamp(), windows, getPane(), getElementMetadata()); + } + + @Override + public WindowedValue withElementMetadata(@Nullable ElementMetadata elementMetadata) { + return new TimestampedValueInMultipleWindows<>( + getValue(), getTimestamp(), getWindows(), getPane(), elementMetadata); } @Override @@ -603,7 +686,18 @@ public void encode(WindowedValue windowedElem, OutputStream outStream, Contex throws CoderException, IOException { InstantCoder.of().encode(windowedElem.getTimestamp(), outStream); windowsCoder.encode(windowedElem.getWindows(), outStream); - PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream); + ElementMetadata elementMetadata = windowedElem.getElementMetadata(); + PaneInfoCoder.INSTANCE.encode( + windowedElem.getPane().withElementMetadata(elementMetadata != null), outStream); + if (elementMetadata != null) { + RunnerApi.ElementMetadata.Builder builder = RunnerApi.ElementMetadata.newBuilder(); + RunnerApi.DrainMode.Enum drainMode = elementMetadata.drainMode(); + if (drainMode != null) { + builder.setDrain(drainMode); + } + RunnerApi.ElementMetadata em = builder.build(); + em.writeDelimitedTo(outStream); + } valueCoder.encode(windowedElem.getValue(), outStream, context); } @@ -618,11 +712,18 @@ public WindowedValue decode(InputStream inStream, Context context) Instant timestamp = InstantCoder.of().decode(inStream); Collection windows = windowsCoder.decode(inStream); PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream); + ElementMetadata elementMetadata = null; + if (pane.isElementMetadata()) { + RunnerApi.ElementMetadata metadata = RunnerApi.ElementMetadata.parseDelimitedFrom(inStream); + RunnerApi.DrainMode.Enum drain = metadata.getDrain(); + elementMetadata = ElementMetadata.create(metadata.hasDrain() ? drain : null); + } T value = valueCoder.decode(inStream, context); // Because there are some remaining (incorrect) uses of WindowedValue with no windows, // we call this deprecated no-validation path when decoding - return WindowedValue.createWithoutValidation(value, timestamp, windows, pane); + return WindowedValue.createWithoutValidation( + value, timestamp, windows, pane, elementMetadata); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java index 9c47e1a59f77..80a5efa7cc2b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java @@ -486,6 +486,7 @@ public static Set getJavaCapabilities() { capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.DATA_SAMPLING)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.SDK_CONSUMING_RECEIVED_DATA)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.ORDERED_LIST_STATE)); + capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.EXTENDED_ELEMENT_METADATA)); return capabilities.build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java index 946deba036db..cda8ee1ea55c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java @@ -52,6 +52,22 @@ public void testEncodingRoundTrip() throws Exception { } } + @Test + public void testEncodingRoundTripWithElementMetadata() throws Exception { + Coder coder = PaneInfo.PaneInfoCoder.INSTANCE; + for (Timing timing : Timing.values()) { + long onTimeIndex = timing == Timing.EARLY ? -1 : 37; + CoderProperties.coderDecodeEncodeEqual( + coder, PaneInfo.createPane(false, false, timing, 389, onTimeIndex, true)); + CoderProperties.coderDecodeEncodeEqual( + coder, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex, true)); + CoderProperties.coderDecodeEncodeEqual( + coder, PaneInfo.createPane(true, false, timing, 0, 0, true)); + CoderProperties.coderDecodeEncodeEqual( + coder, PaneInfo.createPane(true, true, timing, 0, 0, true)); + } + } + @Test public void testEncodings() { assertEquals( @@ -82,5 +98,9 @@ public void testEncodings() { "PaneInfo encoding should remain the same.", 0xF, PaneInfo.createPane(true, true, Timing.UNKNOWN).getEncodedByte()); + assertEquals( + "PaneInfo encoding should remain the same.", + 0x1, + PaneInfo.createPane(true, false, Timing.EARLY, 1, -1, true).getEncodedByte()); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index 0a18f076762a..2776444a6747 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo; import java.util.Arrays; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -73,6 +74,35 @@ public void testWindowedValueCoder() throws CoderException { Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); } + @Test + public void testWindowedValueWithElementMetadataCoder() throws CoderException { + Instant timestamp = new Instant(1234); + WindowedValue value = + WindowedValue.of( + "abc", + new Instant(1234), + Arrays.asList( + new IntervalWindow(timestamp, timestamp.plus(Duration.millis(1000))), + new IntervalWindow( + timestamp.plus(Duration.millis(1000)), timestamp.plus(Duration.millis(2000)))), + PaneInfo.NO_FIRING, + ElementMetadata.create(RunnerApi.DrainMode.Enum.DRAIN_MODE_DRAINING)); + + Coder> windowedValueCoder = + WindowedValue.getFullCoder(StringUtf8Coder.of(), IntervalWindow.getCoder()); + + byte[] encodedValue = CoderUtils.encodeToByteArray(windowedValueCoder, value); + WindowedValue decodedValue = + CoderUtils.decodeFromByteArray(windowedValueCoder, encodedValue); + + Assert.assertEquals(value.getValue(), decodedValue.getValue()); + Assert.assertEquals(value.getTimestamp(), decodedValue.getTimestamp()); + Assert.assertArrayEquals(value.getWindows().toArray(), decodedValue.getWindows().toArray()); + ElementMetadata elementMetadata = decodedValue.getElementMetadata(); + Assert.assertNotNull(elementMetadata); + Assert.assertEquals(RunnerApi.DrainMode.Enum.DRAIN_MODE_DRAINING, elementMetadata.drainMode()); + } + @Test public void testFullWindowedValueCoderIsSerializableWithWellKnownCoderType() { CoderProperties.coderSerializable(