Skip to content

Commit fee5855

Browse files
committed
fix npe in ReduceFnRunner, add additional tests, add timestamp validation
1 parent 7beaf1e commit fee5855

File tree

5 files changed

+25
-1
lines changed

5 files changed

+25
-1
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376376
emit(
377377
contextFactory.base(window, StateStyle.DIRECT),
378378
contextFactory.base(window, StateStyle.RENAMED),
379-
null);
379+
CausedByDrain.NORMAL);
380380
}
381381

382382
// We're all done with merging and emitting elements so can compress the activeWindow state.

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,13 @@ public <T> void output(TupleTag<T> tag, T output) {
445445

446446
@Override
447447
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
448+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
448449
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
449450
}
450451

451452
@Override
452453
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
454+
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
453455
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
454456
}
455457

@@ -1308,11 +1310,13 @@ public <T> void outputWindowedValue(
13081310

13091311
@Override
13101312
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1313+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13111314
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
13121315
}
13131316

13141317
@Override
13151318
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1319+
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
13161320
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
13171321
}
13181322

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ public static <T> WindowedValue<T> of(
322322
@Nullable Long currentRecordOffset,
323323
CausedByDrain causedByDrain) {
324324
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
325+
checkArgument(causedByDrain != null, "WindowedValue requires causedByDrain, but it was null");
325326

326327
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
327328
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.values.CausedByDrain;
2424
import org.apache.beam.sdk.values.PCollection;
2525
import org.apache.beam.sdk.values.WindowedValues;
26+
import org.junit.Assert;
2627
import org.junit.Rule;
2728
import org.junit.Test;
2829
import org.junit.experimental.categories.Category;
@@ -70,6 +71,22 @@ public void testMetadataPropagationAcrossShuffleParameter() {
7071
pipeline.run();
7172
}
7273

74+
@Test
75+
@Category(NeedsRunner.class)
76+
public void testDefaultMetadataPropagationAcrossShuffleParameter() {
77+
Assert.assertFalse(WindowedValues.WindowedValueCoder.isMetadataSupported());
78+
PCollection<String> results =
79+
pipeline
80+
.apply(Create.of(1))
81+
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
82+
.apply(Redistribute.arbitrarily())
83+
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
84+
85+
PAssert.that(results).containsInAnyOrder("NORMAL");
86+
87+
pipeline.run();
88+
}
89+
7390
@Test
7491
@Category(NeedsRunner.class)
7592
public void testMetadataPropagationParameter() {

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,11 +1954,13 @@ public <T> void outputWindowedValue(
19541954

19551955
@Override
19561956
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
1957+
checkTimestamp(windowedValue.getTimestamp());
19571958
outputTo(mainOutputConsumer, windowedValue);
19581959
}
19591960

19601961
@Override
19611962
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
1963+
checkTimestamp(windowedValue.getTimestamp());
19621964
outputTo((FnDataReceiver) localNameToConsumer.get(tag.getId()), windowedValue);
19631965
}
19641966

0 commit comments

Comments
 (0)