Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public interface DoFnRunner<InputT extends @Nullable Object, OutputT extends @Nu
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain);
TimeDomain timeDomain,
boolean causedByDrain);

/**
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,10 @@ public <KeyT> void onTimer(
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
TimeDomain timeDomain,
boolean causedByDrain) {
doFnRunner.onTimer(
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,11 @@ public PaneInfo pane() {
return element.getRecordOffset();
}

@Override
public boolean causedByDrain() {
return false;
}

@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,18 @@ public TimersImpl(StateNamespace namespace) {

@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain, false));
}

@Override
public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) {
timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain));
timerInternals.setTimer(
TimerData.of(namespace, timestamp, outputTimestamp, timeDomain, false));
}

@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,13 @@ public <KeyT> void onTimer(
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
TimeDomain timeDomain,
boolean causedByDrain) {
Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");

OnTimerArgumentProvider<KeyT> argumentProvider =
new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain);
new OnTimerArgumentProvider<>(
timerId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain);
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
}

Expand Down Expand Up @@ -399,6 +401,11 @@ public InputT element() {
return elem.getValue();
}

@Override
public boolean causedByDrain() {
return elem.causedByDrain();
}

@Override
public <T> T sideInput(PCollectionView<T> view) {
checkNotNull(view, "View passed to sideInput cannot be null");
Expand Down Expand Up @@ -702,6 +709,7 @@ private class OnTimerArgumentProvider<KeyT> extends DoFn<InputT, OutputT>.OnTime
private final TimeDomain timeDomain;
private final String timerId;
private final KeyT key;
private final Boolean causedByDrain;
private final OutputBuilderSupplier builderSupplier;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
Expand All @@ -727,28 +735,36 @@ private OnTimerArgumentProvider(
BoundedWindow window,
Instant fireTimestamp,
Instant timestamp,
TimeDomain timeDomain) {
TimeDomain timeDomain,
boolean causedByDrain) {
fn.super();
this.timerId = timerId;
this.window = window;
this.fireTimestamp = fireTimestamp;
this.timestamp = timestamp;
this.timeDomain = timeDomain;
this.key = key;
this.causedByDrain = causedByDrain;
this.builderSupplier =
OutputBuilderSuppliers.supplierForElement(
WindowedValues.builder()
.setValue(null)
.setTimestamp(timestamp)
.setWindow(window)
.setPaneInfo(PaneInfo.NO_FIRING));
.setPaneInfo(PaneInfo.NO_FIRING)
.setCausedByDrain(causedByDrain));
}

@Override
public Instant timestamp() {
return timestamp;
}

@Override
public boolean causedByDrain() {
return causedByDrain;
}

@Override
public Instant fireTimestamp() {
return fireTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public <KeyT> void onTimer(
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
underlying.onTimer(
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ public String getErrorContext() {
// Set a timer to continue processing this element.
timerInternals.setTimer(
TimerInternals.TimerData.of(
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME, false));
}

private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
/**
* A customized {@link DoFnRunner} that handles late data dropping and garbage collection for
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)}
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain)}
* and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain,
* boolean)}
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
Expand Down Expand Up @@ -208,7 +209,8 @@ public <KeyT> void onTimer(
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
TimeDomain timeDomain) {
TimeDomain timeDomain,
boolean causedByDrain) {

if (timerId.equals(SORT_FLUSH_TIMER)) {
onSortFlushTimer(window, stepContext.timerInternals().currentInputWatermarkTime());
Expand All @@ -232,7 +234,14 @@ public <KeyT> void onTimer(
stepContext.timerInternals().currentInputWatermarkTime());
} else {
doFnRunner.onTimer(
timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain);
timerId,
timerFamilyId,
key,
window,
timestamp,
outputTimestamp,
timeDomain,
causedByDrain);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ abstract class TimerData implements Comparable<TimerData> {

public abstract boolean getDeleted();

public abstract boolean causedByDrain();

// When adding a new field, make sure to add it to the compareTo() method.

/** Construct a {@link TimerData} for the given parameters. */
Expand All @@ -196,9 +198,10 @@ public static TimerData of(
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
TimeDomain domain,
boolean causedByDrain) {
return new AutoValue_TimerInternals_TimerData(
timerId, "", namespace, timestamp, outputTimestamp, domain, false);
timerId, "", namespace, timestamp, outputTimestamp, domain, false, causedByDrain);
}

/**
Expand All @@ -211,19 +214,41 @@ public static TimerData of(
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
TimeDomain domain,
boolean causedByDrain) {
return new AutoValue_TimerInternals_TimerData(
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
timerId,
timerFamilyId,
namespace,
timestamp,
outputTimestamp,
domain,
false,
causedByDrain);
}

public static TimerData of(
String timerId,
String timerFamilyId,
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain) {
return of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
}

/**
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}.
*/
public static TimerData of(
StateNamespace namespace, Instant timestamp, Instant outputTimestamp, TimeDomain domain) {
StateNamespace namespace,
Instant timestamp,
Instant outputTimestamp,
TimeDomain domain,
boolean causedByDrain) {
String timerId = String.valueOf(domain.ordinal()) + ':' + timestamp.getMillis();
return of(timerId, namespace, timestamp, outputTimestamp, domain);
return of(timerId, namespace, timestamp, outputTimestamp, domain, causedByDrain);
}

public TimerData deleted() {
Expand All @@ -234,7 +259,8 @@ public TimerData deleted() {
getTimestamp(),
getOutputTimestamp(),
getDomain(),
true);
true,
causedByDrain());
}

/**
Expand Down Expand Up @@ -272,7 +298,9 @@ public String stringKey() {
+ "/"
+ getTimerFamilyId()
+ ":"
+ getTimerId();
+ getTimerId()
+ ":"
+ causedByDrain();
}
}

Expand Down Expand Up @@ -309,7 +337,8 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
Instant timestamp = INSTANT_CODER.decode(inStream);
Instant outputTimestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain);
return TimerData.of(
timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false);
}

@Override
Expand Down Expand Up @@ -355,7 +384,7 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
return TimerData.of(timerId, namespace, timestamp, timestamp, domain, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class InMemoryTimerInternalsTest {
public void testFiringEventTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTimer1 =
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, false);
TimerData eventTimer2 =
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, false);

underTest.setTimer(eventTimer1);
underTest.setTimer(eventTimer2);
Expand Down Expand Up @@ -111,9 +111,9 @@ public void testDeletionById() throws Exception {
public void testFiringProcessingTimeTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData processingTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, false);
TimerData processingTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, false);

underTest.setTimer(processingTime1);
underTest.setTimer(processingTime2);
Expand Down Expand Up @@ -142,19 +142,19 @@ public void testFiringProcessingTimeTimers() throws Exception {
public void testTimerOrdering() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, false);
TimerData processingTime1 =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, false);
TimerData synchronizedProcessingTime1 =
TimerData.of(
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, false);
TimerData eventTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME, false);
TimerData processingTime2 =
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME, false);
TimerData synchronizedProcessingTime2 =
TimerData.of(
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME, false);

underTest.setTimer(processingTime1);
underTest.setTimer(eventTime1);
Expand Down Expand Up @@ -188,9 +188,9 @@ public void testTimerOrdering() throws Exception {
public void testDeduplicate() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData eventTime =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME, false);
TimerData processingTime =
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME, false);
underTest.setTimer(eventTime);
underTest.setTimer(eventTime);
underTest.setTimer(processingTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public void testEncodeDecodeEqual() throws Exception {
StateNamespaces.global(),
new Instant(500L),
new Instant(500L),
TimeDomain.EVENT_TIME));
TimeDomain.EVENT_TIME,
false));
Iterable<WindowedValue<Integer>> elements =
ImmutableList.of(
WindowedValues.valueInGlobalWindow(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc
ArrayList<TimerData> timers = new ArrayList<>(1);
timers.add(
TimerData.of(
StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
StateNamespaces.window(windowFn.windowCoder(), window),
timestamp,
timestamp,
domain,
false));
runner.onTimers(timers);
runner.persist();
}
Expand All @@ -588,7 +592,8 @@ public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws
StateNamespaces.window(windowFn.windowCoder(), window),
timer.getTimestamp(),
timer.getTimestamp(),
timer.getValue()));
timer.getValue(),
false));
}
runner.onTimers(timerData);
runner.persist();
Expand Down
Loading
Loading