Skip to content

Commit 296350a

Browse files
committed
- adds new outputWindowedValue(WV) public interface with implementations
- add tests to cover metadata propagation across parDos in single and mulitple stages for direct runner - plumbs metadata propagation, where it was lost previously.
1 parent 87d7bba commit 296350a

File tree

12 files changed

+273
-32
lines changed

12 files changed

+273
-32
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,26 @@ public <T> void outputWindowedValue(
472472
element.causedByDrain()));
473473
}
474474

475+
@Override
476+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
477+
noteOutput();
478+
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
479+
((TimestampObservingWatermarkEstimator) watermarkEstimator)
480+
.observeTimestamp(windowedValue.getTimestamp());
481+
}
482+
outputReceiver.output(mainOutputTag, windowedValue);
483+
}
484+
485+
@Override
486+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
487+
noteOutput();
488+
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
489+
((TimestampObservingWatermarkEstimator) watermarkEstimator)
490+
.observeTimestamp(windowedValue.getTimestamp());
491+
}
492+
outputReceiver.output(tag, windowedValue);
493+
}
494+
475495
private void noteOutput() {
476496
checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()");
477497
checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()");

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,16 @@ public <T> void output(TupleTag<T> tag, T output) {
443443
SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output));
444444
}
445445

446+
@Override
447+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
448+
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
449+
}
450+
451+
@Override
452+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
453+
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
454+
}
455+
446456
@Override
447457
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
448458
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
@@ -1027,6 +1037,16 @@ public <T> void outputWindowedValue(
10271037
.output();
10281038
}
10291039

1040+
@Override
1041+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1042+
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
1043+
}
1044+
1045+
@Override
1046+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1047+
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
1048+
}
1049+
10301050
@Override
10311051
public BundleFinalizer bundleFinalizer() {
10321052
throw new UnsupportedOperationException(
@@ -1286,6 +1306,16 @@ public <T> void outputWindowedValue(
12861306
.output();
12871307
}
12881308

1309+
@Override
1310+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1311+
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
1312+
}
1313+
1314+
@Override
1315+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1316+
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
1317+
}
1318+
12891319
@Override
12901320
public BundleFinalizer bundleFinalizer() {
12911321
throw new UnsupportedOperationException(

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.beam.sdk.values.Row;
5252
import org.apache.beam.sdk.values.TupleTag;
5353
import org.apache.beam.sdk.values.TypeDescriptor;
54+
import org.apache.beam.sdk.values.WindowedValue;
5455
import org.apache.beam.sdk.values.WindowingStrategy;
5556
import org.checkerframework.checker.nullness.qual.Nullable;
5657
import org.checkerframework.dataflow.qual.Pure;
@@ -284,6 +285,10 @@ public abstract <T> void outputWindowedValue(
284285
Instant timestamp,
285286
Collection<? extends BoundedWindow> windows,
286287
PaneInfo paneInfo);
288+
289+
public abstract <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue);
290+
291+
public abstract void outputWindowedValue(WindowedValue<OutputT> windowedValue);
287292
}
288293

289294
/** Information accessible when running a {@link DoFn.ProcessElement} method. */

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -120,19 +120,9 @@ public OutputBuilder<T> builder(T value) {
120120
@Override
121121
public void output(WindowedValue<T> windowedValue) {
122122
if (outputTag != null) {
123-
context.outputWindowedValue(
124-
outputTag,
125-
windowedValue.getValue(),
126-
windowedValue.getTimestamp(),
127-
windowedValue.getWindows(),
128-
windowedValue.getPaneInfo());
123+
context.outputWindowedValue(outputTag, windowedValue);
129124
} else {
130-
((DoFn<?, T>.WindowedContext) context)
131-
.outputWindowedValue(
132-
windowedValue.getValue(),
133-
windowedValue.getTimestamp(),
134-
windowedValue.getWindows(),
135-
windowedValue.getPaneInfo());
125+
((DoFn<?, T>.WindowedContext) context).outputWindowedValue(windowedValue);
136126
}
137127
}
138128
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,38 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
644644
CausedByDrain.NORMAL));
645645
}
646646

647+
@Override
648+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
649+
for (BoundedWindow w : windowedValue.getWindows()) {
650+
getMutableOutput(mainOutputTag)
651+
.add(
652+
ValueInSingleWindow.of(
653+
windowedValue.getValue(),
654+
windowedValue.getTimestamp(),
655+
w,
656+
windowedValue.getPaneInfo(),
657+
windowedValue.getRecordId(),
658+
windowedValue.getRecordOffset(),
659+
windowedValue.causedByDrain()));
660+
}
661+
}
662+
663+
@Override
664+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
665+
for (BoundedWindow w : windowedValue.getWindows()) {
666+
getMutableOutput(tag)
667+
.add(
668+
ValueInSingleWindow.of(
669+
windowedValue.getValue(),
670+
windowedValue.getTimestamp(),
671+
w,
672+
windowedValue.getPaneInfo(),
673+
windowedValue.getRecordId(),
674+
windowedValue.getRecordOffset(),
675+
windowedValue.causedByDrain()));
676+
}
677+
}
678+
647679
@Override
648680
public <T> void outputWindowedValue(
649681
TupleTag<T> tag,

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Redistribute.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ public void processElement(
187187
.setTimestamp(kv.getValue().getTimestamp())
188188
.setWindow(kv.getValue().getWindow())
189189
.setPaneInfo(kv.getValue().getPaneInfo())
190+
.setCausedByDrain(kv.getValue().getCausedByDrain())
190191
.output();
191192
}
192193
}));

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> i
141141
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
142142
@ProcessElement
143143
public void processElement(
144+
ProcessContext pc,
144145
@Element KV<K, V> element,
145146
@DoFn.Timestamp Instant timestamp,
146147
BoundedWindow window,
@@ -150,7 +151,13 @@ public void processElement(
150151
KV.of(
151152
element.getKey(),
152153
ValueInSingleWindow.of(
153-
element.getValue(), timestamp, window, paneInfo)));
154+
element.getValue(),
155+
timestamp,
156+
window,
157+
paneInfo,
158+
pc.currentRecordId(),
159+
pc.currentRecordOffset(),
160+
pc.causedByDrain())));
154161
}
155162
}))
156163
.setCoder(

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.beam.sdk.values.PCollectionView;
5757
import org.apache.beam.sdk.values.Row;
5858
import org.apache.beam.sdk.values.TupleTag;
59+
import org.apache.beam.sdk.values.WindowedValue;
5960
import org.apache.beam.sdk.values.WindowedValues;
6061
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
6162
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -560,13 +561,7 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
560561
public OutputBuilder<OutputT> builder(OutputT value) {
561562
return outputBuilderSupplier
562563
.builder(value)
563-
.setReceiver(
564-
windowedValue ->
565-
outerContext.outputWindowedValue(
566-
windowedValue.getValue(),
567-
windowedValue.getTimestamp(),
568-
windowedValue.getWindows(),
569-
windowedValue.getPaneInfo()));
564+
.setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue));
570565
}
571566
};
572567
}
@@ -659,6 +654,16 @@ public <T> void outputWindowedValue(
659654
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
660655
}
661656

657+
@Override
658+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
659+
outerContext.outputWindowedValue(windowedValue);
660+
}
661+
662+
@Override
663+
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
664+
outerContext.outputWindowedValue(tag, windowedValue);
665+
}
666+
662667
@Override
663668
public InputT element() {
664669
return element;

sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,
123123
BeamFnApi.Elements.ElementMetadata.Builder builder =
124124
BeamFnApi.Elements.ElementMetadata.newBuilder();
125125
// todo #33176 specify additional metadata in the future
126+
builder.setDrain(
127+
windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
128+
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
129+
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
126130
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
127131
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
128132
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ public static <T> Builder<T> builder(WindowedValue<T> template) {
7777
.setValue(template.getValue())
7878
.setTimestamp(template.getTimestamp())
7979
.setWindows(template.getWindows())
80-
.setPaneInfo(template.getPaneInfo());
80+
.setPaneInfo(template.getPaneInfo())
81+
.setRecordOffset(template.getRecordOffset())
82+
.setRecordId(template.getRecordId())
83+
.setCausedByDrain(template.causedByDrain());
8184
}
8285

8386
public static class Builder<T> implements OutputBuilder<T> {
@@ -271,7 +274,14 @@ public static <T> WindowedValue<T> of(
271274
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");
272275

273276
if (windows.size() == 1) {
274-
return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain);
277+
return of(
278+
value,
279+
timestamp,
280+
windows.iterator().next(),
281+
paneInfo,
282+
currentRecordId,
283+
currentRecordOffset,
284+
causedByDrain);
275285
} else {
276286
return new TimestampedValueInMultipleWindows<>(
277287
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
@@ -287,7 +297,7 @@ static <T> WindowedValue<T> createWithoutValidation(
287297
PaneInfo paneInfo,
288298
CausedByDrain causedByDrain) {
289299
if (windows.size() == 1) {
290-
return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain);
300+
return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain);
291301
} else {
292302
return new TimestampedValueInMultipleWindows<>(
293303
value, timestamp, windows, paneInfo, null, null, causedByDrain);
@@ -299,7 +309,7 @@ public static <T> WindowedValue<T> of(
299309
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
300310
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
301311

302-
return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL);
312+
return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL);
303313
}
304314

305315
/** Returns a {@code WindowedValue} with the given value, timestamp, and window. */
@@ -308,18 +318,21 @@ public static <T> WindowedValue<T> of(
308318
Instant timestamp,
309319
BoundedWindow window,
310320
PaneInfo paneInfo,
321+
@Nullable String currentRecordId,
322+
@Nullable Long currentRecordOffset,
311323
CausedByDrain causedByDrain) {
312324
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
313325

314326
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
315327
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
316-
return valueInGlobalWindow(value, paneInfo);
328+
return new ValueInGlobalWindow<>(
329+
value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
317330
} else if (isGlobal) {
318331
return new TimestampedValueInGlobalWindow<>(
319-
value, timestamp, paneInfo, null, null, causedByDrain);
332+
value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
320333
} else {
321334
return new TimestampedValueInSingleWindow<>(
322-
value, timestamp, window, paneInfo, null, null, causedByDrain);
335+
value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
323336
}
324337
}
325338

0 commit comments

Comments
 (0)