Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
Expand All @@ -38,6 +39,7 @@
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;

Expand All @@ -63,7 +65,6 @@ class UngroupedWindmillReader<T> extends NativeReader<WindowedValue<T>> {
/** A {@link ReaderFactory.Registrar} for ungrouped windmill sources. */
@AutoService(ReaderFactory.Registrar.class)
public static class Registrar implements ReaderFactory.Registrar {

@Override
public Map<String, ReaderFactory> factories() {
Factory factory = new Factory();
Expand Down Expand Up @@ -118,6 +119,7 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
Collection<? extends BoundedWindow> windows =
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());

/**
* https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by
* drain happened upstream
Expand All @@ -129,25 +131,46 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
drainingValueFromUpstream =
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING;
}

// Propagate record ID and offset
String recordId = null;
Long recordOffset = null;
if (context.offsetBasedDeduplicationSupported()) {
byte[] rawId = context.getCurrentRecordId();
if (rawId != null && rawId.length > 0) {
recordId = new String(rawId, StandardCharsets.UTF_8);
}
byte[] rawOffset = context.getCurrentRecordOffset();
if (rawOffset != null && rawOffset.length == Longs.BYTES) {
recordOffset = Longs.fromByteArray(rawOffset);
}
}
Comment on lines +138 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This logic for retrieving record ID and offset from StreamingModeExecutionContext appears to have a potential flaw. The UngroupedWindmillReaderIterator iterates over messages within a WorkItem, but the underlying activeReader in the context (from which getCurrentRecordId and getCurrentRecordOffset get their values) does not appear to be advanced per message. This will likely result in all elements from the same WorkItem receiving the same record ID and offset, which would be incorrect for element-level metadata and could break features like deduplication.

A more robust approach would be to extract the record ID and offset directly from the Windmill.InputMessageBundle for each message, if this information is available there. This would ensure that each element gets its unique metadata.


if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
InputStream key = context.getSerializedKey().newInput();
notifyElementRead(key.available() + data.available() + metadata.available());

@SuppressWarnings("unchecked")
T result =
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));

return WindowedValues.of(
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
result,
timestampMillis,
windows,
paneInfo,
recordId,
recordOffset,
drainingValueFromUpstream);
} else {
notifyElementRead(data.available() + metadata.available());
return WindowedValues.of(
decode(valueCoder, data),
timestampMillis,
windows,
paneInfo,
null,
null,
recordId,
recordOffset,
drainingValueFromUpstream);
}
}
Expand Down
Loading