Skip to content

Commit 9a5de4f

Browse files
committed
[Drain] Propagate drain information to timerData and TImer data structures. mostly noop.
move from boolean to CausedByDrain { CAUSED_BY_DRAIN, NORMAL } dsdas
1 parent cc8dc2f commit 9a5de4f

26 files changed

+501
-117
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,17 +135,23 @@ public TimersImpl(StateNamespace namespace) {
135135

136136
@Override
137137
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
138-
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
138+
timerInternals.setTimer(
139+
TimerData.of(
140+
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
139141
}
140142

141143
@Override
142144
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
143-
timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain));
145+
timerInternals.setTimer(
146+
TimerData.of(
147+
namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
144148
}
145149

146150
@Override
147151
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
148-
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
152+
timerInternals.deleteTimer(
153+
TimerData.of(
154+
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
149155
}
150156

151157
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,11 @@ public String getErrorContext() {
600600
// Set a timer to continue processing this element.
601601
timerInternals.setTimer(
602602
TimerInternals.TimerData.of(
603-
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
603+
stateNamespace,
604+
wakeupTime,
605+
wakeupTime,
606+
TimeDomain.PROCESSING_TIME,
607+
TimerInternals.TimerData.CausedByDrain.NORMAL));
604608
}
605609

606610
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(

runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,10 @@ void deleteTimer(
168168
/** Data about a timer as represented within {@link TimerInternals}. */
169169
@AutoValue
170170
abstract class TimerData implements Comparable<TimerData> {
171+
public enum CausedByDrain {
172+
CAUSED_BY_DRAIN,
173+
NORMAL
174+
}
171175

172176
public abstract String getTimerId();
173177

@@ -188,6 +192,8 @@ abstract class TimerData implements Comparable<TimerData> {
188192

189193
public abstract boolean getDeleted();
190194

195+
public abstract CausedByDrain causedByDrain();
196+
191197
// When adding a new field, make sure to add it to the compareTo() method.
192198

193199
/** Construct a {@link TimerData} for the given parameters. */
@@ -196,9 +202,10 @@ public static TimerData of(
196202
StateNamespace namespace,
197203
Instant timestamp,
198204
Instant outputTimestamp,
199-
TimeDomain domain) {
205+
TimeDomain domain,
206+
CausedByDrain causedByDrain) {
200207
return new AutoValue_TimerInternals_TimerData(
201-
timerId, "", namespace, timestamp, outputTimestamp, domain, false);
208+
timerId, "", namespace, timestamp, outputTimestamp, domain, false, causedByDrain);
202209
}
203210

204211
/**
@@ -211,19 +218,48 @@ public static TimerData of(
211218
StateNamespace namespace,
212219
Instant timestamp,
213220
Instant outputTimestamp,
214-
TimeDomain domain) {
221+
TimeDomain domain,
222+
CausedByDrain causedByDrain) {
215223
return new AutoValue_TimerInternals_TimerData(
216-
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
224+
timerId,
225+
timerFamilyId,
226+
namespace,
227+
timestamp,
228+
outputTimestamp,
229+
domain,
230+
false,
231+
causedByDrain);
232+
}
233+
234+
public static TimerData of(
235+
String timerId,
236+
String timerFamilyId,
237+
StateNamespace namespace,
238+
Instant timestamp,
239+
Instant outputTimestamp,
240+
TimeDomain domain) {
241+
return of(
242+
timerId,
243+
timerFamilyId,
244+
namespace,
245+
timestamp,
246+
outputTimestamp,
247+
domain,
248+
TimerData.CausedByDrain.NORMAL);
217249
}
218250

219251
/**
220252
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
221253
* deterministically generated from the {@code timestamp} and {@code domain}.
222254
*/
223255
public static TimerData of(
224-
StateNamespace namespace, Instant timestamp, Instant outputTimestamp, TimeDomain domain) {
256+
StateNamespace namespace,
257+
Instant timestamp,
258+
Instant outputTimestamp,
259+
TimeDomain domain,
260+
CausedByDrain causedByDrain) {
225261
String timerId = String.valueOf(domain.ordinal()) + ':' + timestamp.getMillis();
226-
return of(timerId, namespace, timestamp, outputTimestamp, domain);
262+
return of(timerId, namespace, timestamp, outputTimestamp, domain, causedByDrain);
227263
}
228264

229265
public TimerData deleted() {
@@ -234,7 +270,8 @@ public TimerData deleted() {
234270
getTimestamp(),
235271
getOutputTimestamp(),
236272
getDomain(),
237-
true);
273+
true,
274+
causedByDrain());
238275
}
239276

240277
/**
@@ -272,7 +309,9 @@ public String stringKey() {
272309
+ "/"
273310
+ getTimerFamilyId()
274311
+ ":"
275-
+ getTimerId();
312+
+ getTimerId()
313+
+ ":"
314+
+ causedByDrain();
276315
}
277316
}
278317

@@ -309,7 +348,14 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
309348
Instant timestamp = INSTANT_CODER.decode(inStream);
310349
Instant outputTimestamp = INSTANT_CODER.decode(inStream);
311350
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
312-
return TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain);
351+
return TimerData.of(
352+
timerId,
353+
timerFamilyId,
354+
namespace,
355+
timestamp,
356+
outputTimestamp,
357+
domain,
358+
TimerData.CausedByDrain.NORMAL);
313359
}
314360

315361
@Override
@@ -355,7 +401,8 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
355401
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
356402
Instant timestamp = INSTANT_CODER.decode(inStream);
357403
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
358-
return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
404+
return TimerData.of(
405+
timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL);
359406
}
360407

361408
@Override

runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,21 @@ public class InMemoryTimerInternalsTest {
4141
public void testFiringEventTimers() throws Exception {
4242
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
4343
TimerData eventTimer1 =
44-
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
44+
TimerData.of(
45+
ID1,
46+
NS1,
47+
new Instant(19),
48+
new Instant(19),
49+
TimeDomain.EVENT_TIME,
50+
TimerData.CausedByDrain.NORMAL);
4551
TimerData eventTimer2 =
46-
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
52+
TimerData.of(
53+
ID2,
54+
NS1,
55+
new Instant(29),
56+
new Instant(29),
57+
TimeDomain.EVENT_TIME,
58+
TimerData.CausedByDrain.NORMAL);
4759

4860
underTest.setTimer(eventTimer1);
4961
underTest.setTimer(eventTimer2);
@@ -111,9 +123,19 @@ public void testDeletionById() throws Exception {
111123
public void testFiringProcessingTimeTimers() throws Exception {
112124
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
113125
TimerData processingTime1 =
114-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
126+
TimerData.of(
127+
NS1,
128+
new Instant(19),
129+
new Instant(19),
130+
TimeDomain.PROCESSING_TIME,
131+
TimerData.CausedByDrain.NORMAL);
115132
TimerData processingTime2 =
116-
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
133+
TimerData.of(
134+
NS1,
135+
new Instant(29),
136+
new Instant(29),
137+
TimeDomain.PROCESSING_TIME,
138+
TimerData.CausedByDrain.NORMAL);
117139

118140
underTest.setTimer(processingTime1);
119141
underTest.setTimer(processingTime2);
@@ -142,19 +164,47 @@ public void testFiringProcessingTimeTimers() throws Exception {
142164
public void testTimerOrdering() throws Exception {
143165
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
144166
TimerData eventTime1 =
145-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
167+
TimerData.of(
168+
NS1,
169+
new Instant(19),
170+
new Instant(19),
171+
TimeDomain.EVENT_TIME,
172+
TimerData.CausedByDrain.NORMAL);
146173
TimerData processingTime1 =
147-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
174+
TimerData.of(
175+
NS1,
176+
new Instant(19),
177+
new Instant(19),
178+
TimeDomain.PROCESSING_TIME,
179+
TimerData.CausedByDrain.NORMAL);
148180
TimerData synchronizedProcessingTime1 =
149181
TimerData.of(
150-
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
182+
NS1,
183+
new Instant(19),
184+
new Instant(19),
185+
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
186+
TimerData.CausedByDrain.NORMAL);
151187
TimerData eventTime2 =
152-
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
188+
TimerData.of(
189+
NS1,
190+
new Instant(29),
191+
new Instant(29),
192+
TimeDomain.EVENT_TIME,
193+
TimerData.CausedByDrain.NORMAL);
153194
TimerData processingTime2 =
154-
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
195+
TimerData.of(
196+
NS1,
197+
new Instant(29),
198+
new Instant(29),
199+
TimeDomain.PROCESSING_TIME,
200+
TimerData.CausedByDrain.NORMAL);
155201
TimerData synchronizedProcessingTime2 =
156202
TimerData.of(
157-
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
203+
NS1,
204+
new Instant(29),
205+
new Instant(29),
206+
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
207+
TimerData.CausedByDrain.NORMAL);
158208

159209
underTest.setTimer(processingTime1);
160210
underTest.setTimer(eventTime1);
@@ -188,9 +238,19 @@ public void testTimerOrdering() throws Exception {
188238
public void testDeduplicate() throws Exception {
189239
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
190240
TimerData eventTime =
191-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
241+
TimerData.of(
242+
NS1,
243+
new Instant(19),
244+
new Instant(19),
245+
TimeDomain.EVENT_TIME,
246+
TimerData.CausedByDrain.NORMAL);
192247
TimerData processingTime =
193-
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
248+
TimerData.of(
249+
NS1,
250+
new Instant(19),
251+
new Instant(19),
252+
TimeDomain.PROCESSING_TIME,
253+
TimerData.CausedByDrain.NORMAL);
194254
underTest.setTimer(eventTime);
195255
underTest.setTimer(eventTime);
196256
underTest.setTimer(processingTime);

runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ public void testEncodeDecodeEqual() throws Exception {
4848
StateNamespaces.global(),
4949
new Instant(500L),
5050
new Instant(500L),
51-
TimeDomain.EVENT_TIME));
51+
TimeDomain.EVENT_TIME,
52+
TimerData.CausedByDrain.NORMAL));
5253
Iterable<WindowedValue<Integer>> elements =
5354
ImmutableList.of(
5455
WindowedValues.valueInGlobalWindow(1),

runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,11 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
574574
ArrayList<TimerData> timers = new ArrayList<>(1);
575575
timers.add(
576576
TimerData.of(
577-
StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
577+
StateNamespaces.window(windowFn.windowCoder(), window),
578+
timestamp,
579+
timestamp,
580+
domain,
581+
TimerData.CausedByDrain.NORMAL));
578582
runner.onTimers(timers);
579583
runner.persist();
580584
}
@@ -588,7 +592,8 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
588592
StateNamespaces.window(windowFn.windowCoder(), window),
589593
timer.getTimestamp(),
590594
timer.getTimestamp(),
591-
timer.getValue()));
595+
timer.getValue(),
596+
TimerData.CausedByDrain.NORMAL));
592597
}
593598
runner.onTimers(timerData);
594599
runner.persist();

runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,8 @@ public void onTimer(OnTimerContext context) {
701701
StateNamespaces.window(windowCoder, (W) context.window()),
702702
context.fireTimestamp(),
703703
context.timestamp(),
704-
context.timeDomain()));
704+
context.timeDomain(),
705+
TimerData.CausedByDrain.NORMAL));
705706
}
706707
}
707708

runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,8 @@ public void testOnTimerCalled() {
317317
StateNamespaces.window(IntervalWindow.getCoder(), window),
318318
timestamp,
319319
timestamp,
320-
TimeDomain.EVENT_TIME)));
320+
TimeDomain.EVENT_TIME,
321+
TimerData.CausedByDrain.NORMAL)));
321322
}
322323

323324
private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
@@ -359,7 +360,8 @@ public <KeyT> void onTimer(
359360
StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
360361
timestamp,
361362
outputTimestamp,
362-
timeDomain));
363+
timeDomain,
364+
TimerData.CausedByDrain.NORMAL));
363365
}
364366

365367
@Override

0 commit comments

Comments
 (0)