diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 26c4979b257f..3051536a5bed 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -472,6 +472,25 @@ public void outputWindowedValue( element.causedByDrain())); } + private void noteOutputAndObserveTimestamp(Instant timestamp) { + noteOutput(); + if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { + ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); + } + } + + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + noteOutputAndObserveTimestamp(windowedValue.getTimestamp()); + outputReceiver.output(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + noteOutputAndObserveTimestamp(windowedValue.getTimestamp()); + outputReceiver.output(tag, windowedValue); + } + private void noteOutput() { checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()"); checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()"); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 85cf9cefde15..0721ddc4685e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -376,7 +376,7 @@ public void processElements(Iterable> values) throws Excep emit( contextFactory.base(window, StateStyle.DIRECT), contextFactory.base(window, StateStyle.RENAMED), - null); + CausedByDrain.NORMAL); } // We're all done with merging and emitting elements so can compress the activeWindow state. diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 74f5a4d09001..4081746bde21 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -443,6 +443,18 @@ public void output(TupleTag tag, T output) { SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); @@ -1027,6 +1039,18 @@ public void outputWindowedValue( .output(); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(timestamp(), windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(timestamp(), windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( @@ -1286,6 +1310,18 @@ public void outputWindowedValue( .output(); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(this.timestamp, windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(this.timestamp, windowedValue.getTimestamp()); + SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue); + } + @Override public BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 41beb93a5cbe..0d892ab12d33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowingStrategy; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -284,6 +285,10 @@ public abstract void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo); + + public abstract void outputWindowedValue(TupleTag tag, WindowedValue windowedValue); + + public abstract void outputWindowedValue(WindowedValue windowedValue); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index e2c0825e0274..6873c1792de5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -120,19 +120,9 @@ public OutputBuilder builder(T value) { @Override public void output(WindowedValue windowedValue) { if (outputTag != null) { - context.outputWindowedValue( - outputTag, - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo()); + context.outputWindowedValue(outputTag, windowedValue); } else { - ((DoFn.WindowedContext) context) - .outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo()); + ((DoFn.WindowedContext) context).outputWindowedValue(windowedValue); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 52aea43cc662..a281f135a5dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -644,6 +644,31 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp CausedByDrain.NORMAL)); } + private void doOutputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + for (BoundedWindow w : windowedValue.getWindows()) { + getMutableOutput(tag) + .add( + ValueInSingleWindow.of( + windowedValue.getValue(), + windowedValue.getTimestamp(), + w, + windowedValue.getPaneInfo(), + windowedValue.getRecordId(), + windowedValue.getRecordOffset(), + windowedValue.causedByDrain())); + } + } + + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + doOutputWindowedValue(mainOutputTag, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + doOutputWindowedValue(tag, windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java index 5fb4ea61e8ef..44f27824382d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java @@ -187,6 +187,7 @@ public void processElement( .setTimestamp(kv.getValue().getTimestamp()) .setWindow(kv.getValue().getWindow()) .setPaneInfo(kv.getValue().getPaneInfo()) + .setCausedByDrain(kv.getValue().getCausedByDrain()) .output(); } })); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java index af125d9e63e8..0fdc5583cd3b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java @@ -141,6 +141,7 @@ public PCollection>> expand(PCollection> i new DoFn, KV>>() { @ProcessElement public void processElement( + ProcessContext pc, @Element KV element, @DoFn.Timestamp Instant timestamp, BoundedWindow window, @@ -150,7 +151,13 @@ public void processElement( KV.of( element.getKey(), ValueInSingleWindow.of( - element.getValue(), timestamp, window, paneInfo))); + element.getValue(), + timestamp, + window, + paneInfo, + pc.currentRecordId(), + pc.currentRecordOffset(), + pc.causedByDrain()))); } })) .setCoder( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 6d058b3b6ada..201040d25f16 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.checkerframework.checker.nullness.qual.Nullable; @@ -560,13 +561,7 @@ public OutputReceiver outputReceiver(DoFn doFn) { public OutputBuilder builder(OutputT value) { return outputBuilderSupplier .builder(value) - .setReceiver( - windowedValue -> - outerContext.outputWindowedValue( - windowedValue.getValue(), - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPaneInfo())); + .setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue)); } }; } @@ -659,6 +654,16 @@ public void outputWindowedValue( outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outerContext.outputWindowedValue(windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outerContext.outputWindowedValue(tag, windowedValue); + } + @Override public InputT element() { return element; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java index 30c06c5f0d9a..0d8b2f7515e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java @@ -123,6 +123,10 @@ public void encode(ValueInSingleWindow windowedElem, OutputStream outStream, BeamFnApi.Elements.ElementMetadata.Builder builder = BeamFnApi.Elements.ElementMetadata.newBuilder(); // todo #33176 specify additional metadata in the future + builder.setDrain( + windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN + ? BeamFnApi.Elements.DrainMode.Enum.DRAINING + : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING); BeamFnApi.Elements.ElementMetadata metadata = builder.build(); ByteArrayCoder.of().encode(metadata.toByteArray(), outStream); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java index 6462de5bac97..9725c12b3708 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java @@ -77,7 +77,10 @@ public static Builder builder(WindowedValue template) { .setValue(template.getValue()) .setTimestamp(template.getTimestamp()) .setWindows(template.getWindows()) - .setPaneInfo(template.getPaneInfo()); + .setPaneInfo(template.getPaneInfo()) + .setRecordOffset(template.getRecordOffset()) + .setRecordId(template.getRecordId()) + .setCausedByDrain(template.causedByDrain()); } public static class Builder implements OutputBuilder { @@ -271,7 +274,14 @@ public static WindowedValue of( checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none"); if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of( + value, + timestamp, + windows.iterator().next(), + paneInfo, + currentRecordId, + currentRecordOffset, + causedByDrain); } else { return new TimestampedValueInMultipleWindows<>( value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); @@ -287,7 +297,7 @@ static WindowedValue createWithoutValidation( PaneInfo paneInfo, CausedByDrain causedByDrain) { if (windows.size() == 1) { - return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain); + return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain); } else { return new TimestampedValueInMultipleWindows<>( value, timestamp, windows, paneInfo, null, null, causedByDrain); @@ -299,7 +309,7 @@ public static WindowedValue of( T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); - return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL); + return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL); } /** Returns a {@code WindowedValue} with the given value, timestamp, and window. */ @@ -308,18 +318,22 @@ public static WindowedValue of( Instant timestamp, BoundedWindow window, PaneInfo paneInfo, + @Nullable String currentRecordId, + @Nullable Long currentRecordOffset, CausedByDrain causedByDrain) { checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null"); + checkArgument(causedByDrain != null, "WindowedValue requires causedByDrain, but it was null"); boolean isGlobal = GlobalWindow.INSTANCE.equals(window); if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { - return valueInGlobalWindow(value, paneInfo); + return new ValueInGlobalWindow<>( + value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } else if (isGlobal) { return new TimestampedValueInGlobalWindow<>( - value, timestamp, paneInfo, null, null, causedByDrain); + value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } else { return new TimestampedValueInSingleWindow<>( - value, timestamp, window, paneInfo, null, null, causedByDrain); + value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain); } } @@ -848,6 +862,10 @@ public static void setMetadataSupported() { metadataSupported = true; } + public static void setMetadataNotSupported() { + metadataSupported = false; + } + public static boolean isMetadataSupported() { return metadataSupported; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java new file mode 100644 index 000000000000..f69e77ba59d2 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.CausedByDrain; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.WindowedValues; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +public class MetadataPropagationTest { + private static final Integer[] EMPTY = new Integer[] {}; + private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5}; + private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; + + @RunWith(JUnit4.class) + public static class MiscTest { + + /** Tests for metadata propagation. */ + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + static class CausedByDrainSettingDoFn extends DoFn { + @ProcessElement + public void process(OutputReceiver r) { + r.builder("value").setCausedByDrain(CausedByDrain.CAUSED_BY_DRAIN).output(); + } + } + + static class CausedByDrainExtractingDoFn extends DoFn { + @ProcessElement + public void process(ProcessContext pc, OutputReceiver r) { + r.output(pc.causedByDrain().toString()); + } + } + + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationAcrossShuffleParameter() { + WindowedValues.WindowedValueCoder.setMetadataSupported(); + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(Redistribute.arbitrarily()) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); + + pipeline.run(); + } + + @After + public void turnOffMetadata() { + WindowedValues.WindowedValueCoder.setMetadataNotSupported(); + } + + @Test + @Category(NeedsRunner.class) + public void testDefaultMetadataPropagationAcrossShuffleParameter() { + Assert.assertFalse(WindowedValues.WindowedValueCoder.isMetadataSupported()); + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(Redistribute.arbitrarily()) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("NORMAL"); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testMetadataPropagationParameter() { + PCollection results = + pipeline + .apply(Create.of(1)) + .apply(ParDo.of(new CausedByDrainSettingDoFn())) + .apply(ParDo.of(new CausedByDrainExtractingDoFn())); + + PAssert.that(results).containsInAnyOrder("CAUSED_BY_DRAIN"); + + pipeline.run(); + } + } +} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 1dfa336e35fe..056487240962 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1787,6 +1787,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValues.of(output, timestamp, currentWindow, currentElement.getPaneInfo())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag, @@ -1860,11 +1870,6 @@ public TimerMap timerFamily(String timerFamilyId) { currentElement.getTimestamp(), currentElement.getPaneInfo()); } - - @Override - public CausedByDrain causedByDrain() { - return currentElement.causedByDrain(); - } } /** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -1947,6 +1952,18 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + checkTimestamp(windowedValue.getTimestamp()); + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + checkTimestamp(windowedValue.getTimestamp()); + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public CausedByDrain causedByDrain() { return currentElement.causedByDrain(); @@ -2254,6 +2271,16 @@ public Object watermarkEstimatorState() { public WatermarkEstimator watermarkEstimator() { return currentWatermarkEstimator; } + + @Override + public CausedByDrain causedByDrain() { + return currentElement.causedByDrain(); + } + + @Override + public CausedByDrain causedByDrain(DoFn doFn) { + return currentElement.causedByDrain(); + } } /** @@ -2354,6 +2381,16 @@ public void outputWindowedValue( outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @SuppressWarnings( "deprecation") // Allowed Skew is deprecated for users, but must be respected private void checkOnWindowExpirationTimestamp(Instant timestamp) { @@ -2644,6 +2681,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValues.of(output, timestamp, currentWindow, currentTimer.getPaneInfo())); } + @Override + public void outputWindowedValue(WindowedValue windowedValue) { + outputTo(mainOutputConsumer, windowedValue); + } + + @Override + public void outputWindowedValue(TupleTag tag, WindowedValue windowedValue) { + outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue); + } + @Override public void outputWindowedValue( TupleTag tag,