Skip to content

Commit cb1c99c

Browse files
committed
fix npe in ReduceFnRunner, add additional tests
1 parent 7beaf1e commit cb1c99c

File tree

3 files changed

+19
-1
lines changed

3 files changed

+19
-1
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
376376
emit(
377377
contextFactory.base(window, StateStyle.DIRECT),
378378
contextFactory.base(window, StateStyle.RENAMED),
379-
null);
379+
CausedByDrain.NORMAL);
380380
}
381381

382382
// We're all done with merging and emitting elements so can compress the activeWindow state.

sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ public static <T> WindowedValue<T> of(
322322
@Nullable Long currentRecordOffset,
323323
CausedByDrain causedByDrain) {
324324
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
325+
checkArgument(causedByDrain != null, "WindowedValue requires causedByDrain, but it was null");
325326

326327
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
327328
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MetadataPropagationTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.values.CausedByDrain;
2424
import org.apache.beam.sdk.values.PCollection;
2525
import org.apache.beam.sdk.values.WindowedValues;
26+
import org.junit.Assert;
2627
import org.junit.Rule;
2728
import org.junit.Test;
2829
import org.junit.experimental.categories.Category;
@@ -70,6 +71,22 @@ public void testMetadataPropagationAcrossShuffleParameter() {
7071
pipeline.run();
7172
}
7273

74+
@Test
75+
@Category(NeedsRunner.class)
76+
public void testDefaultMetadataPropagationAcrossShuffleParameter() {
77+
Assert.assertFalse(WindowedValues.WindowedValueCoder.isMetadataSupported());
78+
PCollection<String> results =
79+
pipeline
80+
.apply(Create.of(1))
81+
.apply(ParDo.of(new CausedByDrainSettingDoFn()))
82+
.apply(Redistribute.arbitrarily())
83+
.apply(ParDo.of(new CausedByDrainExtractingDoFn()));
84+
85+
PAssert.that(results).containsInAnyOrder("NORMAL");
86+
87+
pipeline.run();
88+
}
89+
7390
@Test
7491
@Category(NeedsRunner.class)
7592
public void testMetadataPropagationParameter() {

0 commit comments

Comments
 (0)