Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.dataflow.qual.Pure;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand All @@ -41,6 +42,7 @@ private LateDataUtils() {}
* Return when {@code window} should be garbage collected. If the window's expiration time is on
* or after the end of the global window, it will be truncated to the end of the global window.
*/
@Pure
public static Instant garbageCollectionTime(
BoundedWindow window, WindowingStrategy windowingStrategy) {
return garbageCollectionTime(window, windowingStrategy.getAllowedLateness());
Expand All @@ -50,6 +52,7 @@ public static Instant garbageCollectionTime(
* Return when {@code window} should be garbage collected. If the window's expiration time is on
* or after the end of the global window, it will be truncated to the end of the global window.
*/
@Pure
public static Instant garbageCollectionTime(BoundedWindow window, Duration allowedLateness) {

// If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,12 @@ public PaneInfo pane() {
}

@Override
public String currentRecordId() {
public @Nullable String currentRecordId() {
return element.getRecordId();
}

@Override
public Long currentRecordOffset() {
public @Nullable Long currentRecordOffset() {
return element.getRecordOffset();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,22 @@

import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

/**
* The interface to objects that provide side inputs. Particular implementations may read a side
* input directly or use appropriate sorts of caching, etc.
*/
public interface SideInputReader {
/**
* Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}.
*
* <p>It is valid for a side input to be {@code null}. It is <i>not</i> valid for this to return
* {@code null} for any other reason.
*/
@Nullable
/** Returns the value of the given {@link PCollectionView} for the given {@link BoundedWindow}. */
@Pure
<T> T get(PCollectionView<T> view, BoundedWindow window);

/** Returns true if the given {@link PCollectionView} is valid for this reader. */
@Pure
<T> boolean contains(PCollectionView<T> view);

/** Returns true if there are no side inputs in this reader. */
@Pure
boolean isEmpty();
}
Loading
Loading