Skip to content

Commit 55439e2

Browse files
committed
fix npe in ReduceFnRunner, add additional tests, add timestamp validation.
1 parent 7beaf1e commit 55439e2

File tree

7 files changed

+53
-25
lines changed

7 files changed

+53
-25
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -472,23 +472,22 @@ public <T> void outputWindowedValue(
472472
element.causedByDrain()));
473473
}
474474

475-
@Override
476-
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
475+
private void noteOutputAndObserveTimestamp(Instant timestamp) {
477476
noteOutput();
478477
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
479-
((TimestampObservingWatermarkEstimator) watermarkEstimator)
480-
.observeTimestamp(windowedValue.getTimestamp());
478+
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
481479
}
480+
}
481+
482+
@Override
483+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
484+
noteOutputAndObserveTimestamp(windowedValue.getTimestamp());
482485
outputReceiver.output(mainOutputTag, windowedValue);
483486
}
484487

485488
@Override
486489
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
487-
noteOutput();
488-
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
489-
((TimestampObservingWatermarkEstimator) watermarkEstimator)
490-
.observeTimestamp(windowedValue.getTimestamp());
491-
}
490+
noteOutputAndObserveTimestamp(windowedValue.getTimestamp());
492491
outputReceiver.output(tag, windowedValue);
493492
}
494493

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376376
emit(
377377
contextFactory.base(window, StateStyle.DIRECT),
378378
contextFactory.base(window, StateStyle.RENAMED),
379-
null);
379+
CausedByDrain.NORMAL);
380380
}
381381

382382
// We're all done with merging and emitting elements so can compress the activeWindow state.

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,13 @@ public <T> void output(TupleTag<T> tag, T output) {
445445

446446
@Override
447447
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
448+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
448449
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
449450
}
450451

451452
@Override
452453
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
454+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
453455
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
454456
}
455457

@@ -1039,11 +1041,13 @@ public <T> void outputWindowedValue(
10391041

10401042
@Override
10411043
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1044+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
10421045
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
10431046
}
10441047

10451048
@Override
10461049
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1050+
checkTimestamp(timestamp(), windowedValue.getTimestamp());
10471051
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
10481052
}
10491053

@@ -1308,11 +1312,13 @@ public <T> void outputWindowedValue(
13081312

13091313
@Override
13101314
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1315+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13111316
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
13121317
}
13131318

13141319
@Override
13151320
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1321+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13161322
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
13171323
}
13181324

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -644,10 +644,9 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
644644
CausedByDrain.NORMAL));
645645
}
646646

647-
@Override
648-
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
647+
private <T> void doOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
649648
for (BoundedWindow w : windowedValue.getWindows()) {
650-
getMutableOutput(mainOutputTag)
649+
getMutableOutput(tag)
651650
.add(
652651
ValueInSingleWindow.of(
653652
windowedValue.getValue(),
@@ -660,20 +659,14 @@ public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
660659
}
661660
}
662661

662+
@Override
663+
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
664+
doOutputWindowedValue(mainOutputTag, windowedValue);
665+
}
666+
663667
@Override
664668
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
665-
for (BoundedWindow w : windowedValue.getWindows()) {
666-
getMutableOutput(tag)
667-
.add(
668-
ValueInSingleWindow.of(
669-
windowedValue.getValue(),
670-
windowedValue.getTimestamp(),
671-
w,
672-
windowedValue.getPaneInfo(),
673-
windowedValue.getRecordId(),
674-
windowedValue.getRecordOffset(),
675-
windowedValue.causedByDrain()));
676-
}
669+
doOutputWindowedValue(tag, windowedValue);
677670
}
678671

679672
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ public static <T> WindowedValue<T> of(
322322
@Nullable Long currentRecordOffset,
323323
CausedByDrain causedByDrain) {
324324
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
325+
checkArgument(causedByDrain != null, "WindowedValue requires causedByDrain, but it was null");
325326

326327
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
327328
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
@@ -861,6 +862,10 @@ public static void setMetadataSupported() {
861862
metadataSupported = true;
862863
}
863864

865+
public static void setMetadataNotSupported() {
866+
metadataSupported = false;
867+
}
868+
864869
public static boolean isMetadataSupported() {
865870
return metadataSupported;
866871
}

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.apache.beam.sdk.values.CausedByDrain;
2424
import org.apache.beam.sdk.values.PCollection;
2525
import org.apache.beam.sdk.values.WindowedValues;
26+
import org.junit.After;
27+
import org.junit.Assert;
2628
import org.junit.Rule;
2729
import org.junit.Test;
2830
import org.junit.experimental.categories.Category;
@@ -70,6 +72,27 @@ public void testMetadataPropagationAcrossShuffleParameter() {
7072
pipeline.run();
7173
}
7274

75+
@After
76+
public void turnOffMetadata() {
77+
WindowedValues.WindowedValueCoder.setMetadataNotSupported();
78+
}
79+
80+
@Test
81+
@Category(NeedsRunner.class)
82+
public void testDefaultMetadataPropagationAcrossShuffleParameter() {
83+
Assert.assertFalse(WindowedValues.WindowedValueCoder.isMetadataSupported());
84+
PCollection<String> results =
85+
pipeline
86+
.apply(Create.of(1))
87+
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
88+
.apply(Redistribute.arbitrarily())
89+
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
90+
91+
PAssert.that(results).containsInAnyOrder("NORMAL");
92+
93+
pipeline.run();
94+
}
95+
7396
@Test
7497
@Category(NeedsRunner.class)
7598
public void testMetadataPropagationParameter() {

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,11 +1954,13 @@ public <T> void outputWindowedValue(
19541954

19551955
@Override
19561956
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1957+
checkTimestamp(windowedValue.getTimestamp());
19571958
outputTo(mainOutputConsumer, windowedValue);
19581959
}
19591960

19601961
@Override
19611962
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1963+
checkTimestamp(windowedValue.getTimestamp());
19621964
outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue);
19631965
}
19641966

0 commit comments

Comments
 (0)