Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,25 @@ public <T> void outputWindowedValue(
element.causedByDrain()));
}

private void noteOutputAndObserveTimestamp(Instant timestamp) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
}
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
noteOutputAndObserveTimestamp(windowedValue.getTimestamp());
outputReceiver.output(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
noteOutputAndObserveTimestamp(windowedValue.getTimestamp());
outputReceiver.output(tag, windowedValue);
}

private void noteOutput() {
checkState(!hasClaimFailed, "Output is not allowed after a failed tryClaim()");
checkState(numClaimedBlocks > 0, "Output is not allowed before tryClaim()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public void processElements(Iterable<WindowedValue<InputT>> values) throws Excep
emit(
contextFactory.base(window, StateStyle.DIRECT),
contextFactory.base(window, StateStyle.RENAMED),
null);
CausedByDrain.NORMAL);
}

// We're all done with merging and emitting elements so can compress the activeWindow state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,18 @@ public <T> void output(TupleTag<T> tag, T output) {
SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output));
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
checkTimestamp(elem.getTimestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
Expand Down Expand Up @@ -1027,6 +1039,18 @@ public <T> void outputWindowedValue(
.output();
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
checkTimestamp(timestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
checkTimestamp(timestamp(), windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -1286,6 +1310,18 @@ public <T> void outputWindowedValue(
.output();
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
checkTimestamp(this.timestamp, windowedValue.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, windowedValue);
}

@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;
Expand Down Expand Up @@ -284,6 +285,10 @@ public abstract <T> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo);

public abstract <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue);

public abstract void outputWindowedValue(WindowedValue<OutputT> windowedValue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kennknowles I'm really curious if we are able to avoid that. You've spent quite a lot of time on outputbuilder work to avoid changing public interface and this method is exposed in ProcessContext so any user can use it if they want.

}

/** Information accessible when running a {@link DoFn.ProcessElement} method. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,9 @@ public OutputBuilder<T> builder(T value) {
@Override
public void output(WindowedValue<T> windowedValue) {
if (outputTag != null) {
context.outputWindowedValue(
outputTag,
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo());
context.outputWindowedValue(outputTag, windowedValue);
} else {
((DoFn<?, T>.WindowedContext) context)
.outputWindowedValue(
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo());
((DoFn<?, T>.WindowedContext) context).outputWindowedValue(windowedValue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,31 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
CausedByDrain.NORMAL));
}

private <T> void doOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
for (BoundedWindow w : windowedValue.getWindows()) {
getMutableOutput(tag)
.add(
ValueInSingleWindow.of(
windowedValue.getValue(),
windowedValue.getTimestamp(),
w,
windowedValue.getPaneInfo(),
windowedValue.getRecordId(),
windowedValue.getRecordOffset(),
windowedValue.causedByDrain()));
}
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
doOutputWindowedValue(mainOutputTag, windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
doOutputWindowedValue(tag, windowedValue);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public void processElement(
.setTimestamp(kv.getValue().getTimestamp())
.setWindow(kv.getValue().getWindow())
.setPaneInfo(kv.getValue().getPaneInfo())
.setCausedByDrain(kv.getValue().getCausedByDrain())
.output();
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> i
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
ProcessContext pc,
@Element KV<K, V> element,
@DoFn.Timestamp Instant timestamp,
BoundedWindow window,
Expand All @@ -150,7 +151,13 @@ public void processElement(
KV.of(
element.getKey(),
ValueInSingleWindow.of(
element.getValue(), timestamp, window, paneInfo)));
element.getValue(),
timestamp,
window,
paneInfo,
pc.currentRecordId(),
pc.currentRecordOffset(),
pc.causedByDrain())));
}
}))
.setCoder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -560,13 +561,7 @@ public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
public OutputBuilder<OutputT> builder(OutputT value) {
return outputBuilderSupplier
.builder(value)
.setReceiver(
windowedValue ->
outerContext.outputWindowedValue(
windowedValue.getValue(),
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPaneInfo()));
.setReceiver(windowedValue -> outerContext.outputWindowedValue(windowedValue));
}
};
}
Expand Down Expand Up @@ -659,6 +654,16 @@ public <T> void outputWindowedValue(
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
}

@Override
public void outputWindowedValue(WindowedValue<OutputT> windowedValue) {
outerContext.outputWindowedValue(windowedValue);
}

@Override
public <T> void outputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedValue) {
outerContext.outputWindowedValue(tag, windowedValue);
}

@Override
public InputT element() {
return element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,
BeamFnApi.Elements.ElementMetadata.Builder builder =
BeamFnApi.Elements.ElementMetadata.newBuilder();
// todo #33176 specify additional metadata in the future
builder.setDrain(
windowedElem.getCausedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public static <T> Builder<T> builder(WindowedValue<T> template) {
.setValue(template.getValue())
.setTimestamp(template.getTimestamp())
.setWindows(template.getWindows())
.setPaneInfo(template.getPaneInfo());
.setPaneInfo(template.getPaneInfo())
.setRecordOffset(template.getRecordOffset())
.setRecordId(template.getRecordId())
.setCausedByDrain(template.causedByDrain());
}

public static class Builder<T> implements OutputBuilder<T> {
Expand Down Expand Up @@ -271,7 +274,14 @@ public static <T> WindowedValue<T> of(
checkArgument(windows.size() > 0, "WindowedValue requires windows, but there were none");

if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain);
return of(
value,
timestamp,
windows.iterator().next(),
paneInfo,
currentRecordId,
currentRecordOffset,
causedByDrain);
} else {
return new TimestampedValueInMultipleWindows<>(
value, timestamp, windows, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
Expand All @@ -287,7 +297,7 @@ static <T> WindowedValue<T> createWithoutValidation(
PaneInfo paneInfo,
CausedByDrain causedByDrain) {
if (windows.size() == 1) {
return of(value, timestamp, windows.iterator().next(), paneInfo, causedByDrain);
return of(value, timestamp, windows.iterator().next(), paneInfo, null, null, causedByDrain);
} else {
return new TimestampedValueInMultipleWindows<>(
value, timestamp, windows, paneInfo, null, null, causedByDrain);
Expand All @@ -299,7 +309,7 @@ public static <T> WindowedValue<T> of(
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");

return of(value, timestamp, window, paneInfo, CausedByDrain.NORMAL);
return of(value, timestamp, window, paneInfo, null, null, CausedByDrain.NORMAL);
}

/** Returns a {@code WindowedValue} with the given value, timestamp, and window. */
Expand All @@ -308,18 +318,22 @@ public static <T> WindowedValue<T> of(
Instant timestamp,
BoundedWindow window,
PaneInfo paneInfo,
@Nullable String currentRecordId,
@Nullable Long currentRecordOffset,
CausedByDrain causedByDrain) {
checkArgument(paneInfo != null, "WindowedValue requires PaneInfo, but it was null");
checkArgument(causedByDrain != null, "WindowedValue requires causedByDrain, but it was null");

boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
return valueInGlobalWindow(value, paneInfo);
return new ValueInGlobalWindow<>(
value, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
} else if (isGlobal) {
return new TimestampedValueInGlobalWindow<>(
value, timestamp, paneInfo, null, null, causedByDrain);
value, timestamp, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
} else {
return new TimestampedValueInSingleWindow<>(
value, timestamp, window, paneInfo, null, null, causedByDrain);
value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, causedByDrain);
}
}

Expand Down Expand Up @@ -848,6 +862,10 @@ public static void setMetadataSupported() {
metadataSupported = true;
}

public static void setMetadataNotSupported() {
metadataSupported = false;
}

public static boolean isMetadataSupported() {
return metadataSupported;
}
Expand Down
Loading
Loading