Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,23 @@ public TimersImpl(StateNamespace namespace) {

@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
timerInternals.setTimer(
TimerData.of(
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
}

@Override
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain));
timerInternals.setTimer(
TimerData.of(
namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
}

@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
timerInternals.deleteTimer(
TimerData.of(
namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,11 @@ public String getErrorContext() {
// Set a timer to continue processing this element.
timerInternals.setTimer(
TimerInternals.TimerData.of(
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
stateNamespace,
wakeupTime,
wakeupTime,
TimeDomain.PROCESSING_TIME,
TimerInternals.TimerData.CausedByDrain.NORMAL));
}

private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ void deleteTimer(
/** Data about a timer as represented within {@link TimerInternals}. */
@AutoValue
abstract class TimerData implements Comparable<TimerData> {
public enum CausedByDrain {
CAUSED_BY_DRAIN,
NORMAL
}

public abstract String getTimerId();

Expand All @@ -188,6 +192,8 @@ abstract class TimerData implements Comparable<TimerData> {

public abstract boolean getDeleted();

public abstract CausedByDrain causedByDrain();

// When adding a new field, make sure to add it to the compareTo() method.

/** Construct a {@link TimerData} for the given parameters. */
Expand All @@ -196,9 +202,10 @@ public static TimerData of(
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
TimeDomain domain,
CausedByDrain causedByDrain) {
return new AutoValue_TimerInternals_TimerData(
timerId, "", namespace, timestamp, outputTimestamp, domain, false);
timerId, "", namespace, timestamp, outputTimestamp, domain, false, causedByDrain);
}

/**
Expand All @@ -211,19 +218,48 @@ public static TimerData of(
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
TimeDomain domain,
CausedByDrain causedByDrain) {
return new AutoValue_TimerInternals_TimerData(
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
timerId,
timerFamilyId,
namespace,
timestamp,
outputTimestamp,
domain,
false,
causedByDrain);
}

public static TimerData of(
String timerId,
String timerFamilyId,
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
return of(
timerId,
timerFamilyId,
namespace,
timestamp,
outputTimestamp,
domain,
TimerData.CausedByDrain.NORMAL);
}

/**
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}.
*/
public static TimerData of(
StateNamespace namespace, Instant timestamp, Instant outputTimestamp, TimeDomain domain) {
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain,
CausedByDrain causedByDrain) {
String timerId = String.valueOf(domain.ordinal()) + ':' + timestamp.getMillis();
return of(timerId, namespace, timestamp, outputTimestamp, domain);
return of(timerId, namespace, timestamp, outputTimestamp, domain, causedByDrain);
}

public TimerData deleted() {
Expand All @@ -234,7 +270,8 @@ public TimerData deleted() {
getTimestamp(),
getOutputTimestamp(),
getDomain(),
true);
true,
causedByDrain());
}

/**
Expand Down Expand Up @@ -272,7 +309,9 @@ public String stringKey() {
+ "/"
+ getTimerFamilyId()
+ ":"
+ getTimerId();
+ getTimerId()
+ ":"
+ causedByDrain();
}
}

Expand Down Expand Up @@ -309,7 +348,14 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
Instant timestamp = INSTANT_CODER.decode(inStream);
Instant outputTimestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain);
return TimerData.of(
timerId,
timerFamilyId,
namespace,
timestamp,
outputTimestamp,
domain,
TimerData.CausedByDrain.NORMAL);
}

@Override
Expand Down Expand Up @@ -355,7 +401,8 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
return TimerData.of(
timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,21 @@ public class InMemoryTimerInternalsTest {
public void testFiringEventTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTimer1 =
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData.of(
ID1,
NS1,
new Instant(19),
new Instant(19),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData eventTimer2 =
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
TimerData.of(
ID2,
NS1,
new Instant(29),
new Instant(29),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);

underTest.setTimer(eventTimer1);
underTest.setTimer(eventTimer2);
Expand Down Expand Up @@ -111,9 +123,19 @@ public void testDeletionById() throws Exception {
public void testFiringProcessingTimeTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData processingTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData processingTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);

underTest.setTimer(processingTime1);
underTest.setTimer(processingTime2);
Expand Down Expand Up @@ -142,19 +164,47 @@ public void testFiringProcessingTimeTimers() throws Exception {
public void testTimerOrdering() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData processingTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData synchronizedProcessingTime1 =
TimerData.of(
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
NS1,
new Instant(19),
new Instant(19),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData eventTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData processingTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData.of(
NS1,
new Instant(29),
new Instant(29),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData synchronizedProcessingTime2 =
TimerData.of(
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
NS1,
new Instant(29),
new Instant(29),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);

underTest.setTimer(processingTime1);
underTest.setTimer(eventTime1);
Expand Down Expand Up @@ -188,9 +238,19 @@ public void testTimerOrdering() throws Exception {
public void testDeduplicate() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL);
TimerData processingTime =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData.of(
NS1,
new Instant(19),
new Instant(19),
TimeDomain.PROCESSING_TIME,
TimerData.CausedByDrain.NORMAL);
underTest.setTimer(eventTime);
underTest.setTimer(eventTime);
underTest.setTimer(processingTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public void testEncodeDecodeEqual() throws Exception {
StateNamespaces.global(),
new Instant(500L),
new Instant(500L),
TimeDomain.EVENT_TIME));
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL));
Iterable<WindowedValue<Integer>> elements =
ImmutableList.of(
WindowedValues.valueInGlobalWindow(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
ArrayList<TimerData> timers = new ArrayList<>(1);
timers.add(
TimerData.of(
StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
StateNamespaces.window(windowFn.windowCoder(), window),
timestamp,
timestamp,
domain,
TimerData.CausedByDrain.NORMAL));
runner.onTimers(timers);
runner.persist();
}
Expand All @@ -588,7 +592,8 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
StateNamespaces.window(windowFn.windowCoder(), window),
timer.getTimestamp(),
timer.getTimestamp(),
timer.getValue()));
timer.getValue(),
TimerData.CausedByDrain.NORMAL));
}
runner.onTimers(timerData);
runner.persist();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ public void onTimer(OnTimerContext context) {
StateNamespaces.window(windowCoder, (W) context.window()),
context.fireTimestamp(),
context.timestamp(),
context.timeDomain()));
context.timeDomain(),
TimerData.CausedByDrain.NORMAL));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ public void testOnTimerCalled() {
StateNamespaces.window(IntervalWindow.getCoder(), window),
timestamp,
timestamp,
TimeDomain.EVENT_TIME)));
TimeDomain.EVENT_TIME,
TimerData.CausedByDrain.NORMAL)));
}

private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
Expand Down Expand Up @@ -359,7 +360,8 @@ public <KeyT> void onTimer(
StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
timestamp,
outputTimestamp,
timeDomain));
timeDomain,
TimerData.CausedByDrain.NORMAL));
}

@Override
Expand Down
Loading
Loading