Skip to content

Commit 0dbde3d

Browse files
committed
ontimer
1 parent 86bb6b1 commit 0dbde3d

File tree

47 files changed

+267
-82
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+267
-82
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nu
4646
BoundedWindow window,
4747
Instant timestamp,
4848
Instant outputTimestamp,
49-
TimeDomain timeDomain);
49+
TimeDomain timeDomain,
50+
boolean causedByDrain);
5051

5152
/**
5253
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,10 @@ public <KeyT> void onTimer(
8989
BoundedWindow window,
9090
Instant timestamp,
9191
Instant outputTimestamp,
92-
TimeDomain timeDomain) {
93-
doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
92+
TimeDomain timeDomain,
93+
boolean causedByDrain) {
94+
doFnRunner.onTimer(
95+
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
9496
}
9597

9698
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,11 @@ public PaneInfo pane() {
396396
return element.getRecordOffset();
397397
}
398398

399+
@Override
400+
public boolean causedByDrain() {
401+
return false;
402+
}
403+
399404
@Override
400405
public PipelineOptions getPipelineOptions() {
401406
return pipelineOptions;

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,13 @@ public <KeyT> void onTimer(
200200
BoundedWindow window,
201201
Instant timestamp,
202202
Instant outputTimestamp,
203-
TimeDomain timeDomain) {
203+
TimeDomain timeDomain,
204+
boolean causedByDrain) {
204205
Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");
205206

206207
OnTimerArgumentProvider<KeyT> argumentProvider =
207-
new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain);
208+
new OnTimerArgumentProvider<>(
209+
timerId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
208210
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
209211
}
210212

@@ -399,6 +401,11 @@ public InputT element() {
399401
return elem.getValue();
400402
}
401403

404+
@Override
405+
public boolean causedByDrain() {
406+
return elem.causedByDrain();
407+
}
408+
402409
@Override
403410
public <T> T sideInput(PCollectionView<T> view) {
404411
checkNotNull(view, "View passed to sideInput cannot be null");
@@ -702,6 +709,7 @@ private class OnTimerArgumentProvider<KeyT> extends DoFn<InputT, OutputT>.OnTime
702709
private final TimeDomain timeDomain;
703710
private final String timerId;
704711
private final KeyT key;
712+
private final Boolean causedByDrain;
705713
private final OutputBuilderSupplier builderSupplier;
706714

707715
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
@@ -727,28 +735,36 @@ private OnTimerArgumentProvider(
727735
BoundedWindow window,
728736
Instant fireTimestamp,
729737
Instant timestamp,
730-
TimeDomain timeDomain) {
738+
TimeDomain timeDomain,
739+
boolean causedByDrain) {
731740
fn.super();
732741
this.timerId = timerId;
733742
this.window = window;
734743
this.fireTimestamp = fireTimestamp;
735744
this.timestamp = timestamp;
736745
this.timeDomain = timeDomain;
737746
this.key = key;
747+
this.causedByDrain = causedByDrain;
738748
this.builderSupplier =
739749
OutputBuilderSuppliers.supplierForElement(
740750
WindowedValues.builder()
741751
.setValue(null)
742752
.setTimestamp(timestamp)
743753
.setWindow(window)
744-
.setPaneInfo(PaneInfo.NO_FIRING));
754+
.setPaneInfo(PaneInfo.NO_FIRING)
755+
.setCausedByDrain(causedByDrain));
745756
}
746757

747758
@Override
748759
public Instant timestamp() {
749760
return timestamp;
750761
}
751762

763+
@Override
764+
public boolean causedByDrain() {
765+
return causedByDrain;
766+
}
767+
752768
@Override
753769
public Instant fireTimestamp() {
754770
return fireTimestamp;

runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public <KeyT> void onTimer(
116116
Instant timestamp,
117117
Instant outputTimestamp,
118118
TimeDomain timeDomain) {
119-
underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
119+
underlying.onTimer(
120+
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, false);
120121
}
121122

122123
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
/**
4949
* A customized {@link DoFnRunner} that handles late data dropping and garbage collection for
5050
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}
51-
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain)}
51+
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain,
52+
* boolean)}
5253
*
5354
* @param <InputT> the type of the {@link DoFn} (main) input elements
5455
* @param <OutputT> the type of the {@link DoFn} (main) output elements
@@ -208,7 +209,8 @@ public <KeyT> void onTimer(
208209
BoundedWindow window,
209210
Instant timestamp,
210211
Instant outputTimestamp,
211-
TimeDomain timeDomain) {
212+
TimeDomain timeDomain,
213+
boolean causedByDrain) {
212214

213215
if (timerId.equals(SORT_FLUSH_TIMER)) {
214216
onSortFlushTimer(window, stepContext.timerInternals().currentInputWatermarkTime());
@@ -232,7 +234,14 @@ public <KeyT> void onTimer(
232234
stepContext.timerInternals().currentInputWatermarkTime());
233235
} else {
234236
doFnRunner.onTimer(
235-
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
237+
timerId,
238+
timerFamilyId,
239+
key,
240+
window,
241+
timestamp,
242+
outputTimestamp,
243+
timeDomain,
244+
causedByDrain);
236245
}
237246
}
238247
}

runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() {
140140
GlobalWindow.INSTANCE,
141141
new Instant(0),
142142
new Instant(0),
143-
TimeDomain.EVENT_TIME);
143+
TimeDomain.EVENT_TIME,
144+
false);
144145
}
145146

146147
/**
@@ -265,7 +266,8 @@ public void testOnTimerCalled() {
265266
GlobalWindow.INSTANCE,
266267
currentTime.plus(offset),
267268
currentTime.plus(offset),
268-
TimeDomain.EVENT_TIME);
269+
TimeDomain.EVENT_TIME,
270+
false);
269271

270272
assertThat(
271273
fn.onTimerInvocations,
@@ -276,7 +278,8 @@ public void testOnTimerCalled() {
276278
StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
277279
currentTime.plus(offset),
278280
currentTime.plus(offset),
279-
TimeDomain.EVENT_TIME)));
281+
TimeDomain.EVENT_TIME,
282+
false)));
280283
}
281284

282285
/**
@@ -592,7 +595,8 @@ public void testOnTimerAllowedSkew() {
592595
GlobalWindow.INSTANCE,
593596
new Instant(0),
594597
new Instant(0),
595-
TimeDomain.EVENT_TIME);
598+
TimeDomain.EVENT_TIME,
599+
false);
596600
}
597601

598602
@Test
@@ -624,7 +628,8 @@ public void testOnTimerNoSkew() {
624628
GlobalWindow.INSTANCE,
625629
new Instant(0),
626630
new Instant(0),
627-
TimeDomain.EVENT_TIME);
631+
TimeDomain.EVENT_TIME,
632+
false);
628633
});
629634

630635
assertThat(exception.getCause(), isA(IllegalArgumentException.class));
@@ -702,7 +707,7 @@ public void onTimer(OnTimerContext context) {
702707
context.fireTimestamp(),
703708
context.timestamp(),
704709
context.timeDomain(),
705-
false));
710+
context.causedByDrain()));
706711
}
707712
}
708713

runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ public <KeyT> void onTimer(
352352
BoundedWindow window,
353353
Instant timestamp,
354354
Instant outputTimestamp,
355-
TimeDomain timeDomain) {
355+
TimeDomain timeDomain,
356+
boolean causedByDrain) {
356357
firedTimers.add(
357358
TimerData.of(
358359
timerId,
@@ -361,7 +362,7 @@ public <KeyT> void onTimer(
361362
timestamp,
362363
outputTimestamp,
363364
timeDomain,
364-
false));
365+
causedByDrain));
365366
}
366367

367368
@Override

runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,8 @@ private static void advanceInputWatermark(
468468
window,
469469
timer.getTimestamp(),
470470
timer.getOutputTimestamp(),
471-
timer.getDomain());
471+
timer.getDomain(),
472+
timer.causedByDrain());
472473
}
473474
}
474475

runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,19 @@ public <KeyT> void onTimer(
7373
final BoundedWindow window,
7474
final Instant timestamp,
7575
final Instant outputTimestamp,
76-
final TimeDomain timeDomain) {
76+
final TimeDomain timeDomain,
77+
boolean causedByDrain) {
7778
try (Closeable ignored =
7879
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
79-
delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
80+
delegate.onTimer(
81+
timerId,
82+
timerFamilyId,
83+
key,
84+
window,
85+
timestamp,
86+
outputTimestamp,
87+
timeDomain,
88+
causedByDrain);
8089
} catch (IOException e) {
8190
throw new RuntimeException(e);
8291
}

0 commit comments

Comments
 (0)