Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
serializedWorkItemSize,
watermarks,
processingContext,
drainMode,
getWorkStreamLatencies) ->
computationStateCache
.get(processingContext.computationId())
Expand All @@ -401,6 +402,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
serializedWorkItemSize,
watermarks,
processingContext,
drainMode,
getWorkStreamLatencies);
}),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public boolean workIsFailed() {
return work != null && work.isFailed();
}

public boolean getDrainMode() {
return work != null ? work.getDrainMode() : false;
}

public boolean offsetBasedDeduplicationSupported() {
return activeReader != null
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
Expand Down Expand Up @@ -820,7 +824,10 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder))
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator();
}

Expand Down Expand Up @@ -880,7 +887,10 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
Expand Down Expand Up @@ -117,8 +118,12 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
Collection<? extends BoundedWindow> windows =
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());
boolean drainingValueFromUpstream = false;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
BeamFnApi.Elements.ElementMetadata elementMetadata =
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
drainingValueFromUpstream =
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING;
}
if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
Expand All @@ -129,11 +134,19 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
T result =
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
// todo #33176 propagate metadata to windowed value
return WindowedValues.of(result, timestampMillis, windows, paneInfo);
return WindowedValues.of(
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
} else {
notifyElementRead(data.available() + metadata.available());
// todo #33176 propagate metadata to windowed value
return WindowedValues.of(decode(valueCoder, data), timestampMillis, windows, paneInfo);
return WindowedValues.of(
decode(valueCoder, data),
timestampMillis,
windows,
paneInfo,
null,
null,
drainingValueFromUpstream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.TimerInternals.TimerData;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>

private final Windmill.WorkItem workItem;
private final K key;
private final boolean drainMode;

private final transient Coder<? extends BoundedWindow> windowCoder;
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
Expand All @@ -70,12 +72,14 @@ public WindmillKeyedWorkItem(
Windmill.WorkItem workItem,
Coder<? extends BoundedWindow> windowCoder,
Coder<Collection<? extends BoundedWindow>> windowsCoder,
Coder<ElemT> valueCoder) {
Coder<ElemT> valueCoder,
boolean drainMode) {
this.key = key;
this.workItem = workItem;
this.windowCoder = windowCoder;
this.windowsCoder = windowsCoder;
this.valueCoder = valueCoder;
this.drainMode = drainMode;
}

@Override
Expand All @@ -93,7 +97,10 @@ public Iterable<TimerData> timersIterable() {
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder));
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
}

@Override
Expand All @@ -108,13 +115,19 @@ public Iterable<WindowedValue<ElemT>> elementsIterable() {
Collection<? extends BoundedWindow> windows =
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());
// Draining value is based on upstream data
boolean drainingValueFromUpstream = false;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
BeamFnApi.Elements.ElementMetadata elementMetadata =
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
drainingValueFromUpstream =
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING;
}
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
// todo #33176 specify additional metadata in the future
return WindowedValues.of(value, timestamp, windows, paneInfo);
return WindowedValues.of(
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ static Timer timerDataToWindmillTimer(
}

public static TimerData windmillTimerToTimerData(
WindmillNamespacePrefix prefix, Timer timer, Coder<? extends BoundedWindow> windowCoder) {
WindmillNamespacePrefix prefix,
Timer timer,
Coder<? extends BoundedWindow> windowCoder,
boolean draining) {

// The tag is a path-structure string but cheaper to parse than a proper URI. It follows
// this pattern, where no component but the ID can contain a slash
Expand Down Expand Up @@ -395,6 +398,7 @@ public static TimerData windmillTimerToTimerData(
timestamp,
outputTimestamp,
timerTypeToTimeDomain(timer.getType()));
// todo add draining
}

private static boolean useNewTimerTagEncoding(TimerData timerData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
final K key = keyCoder.decode(context.getSerializedKey().newInput(), Coder.Context.OUTER);
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
new WindmillKeyedWorkItem<>(key, workItem, windowCoder, windowsCoder, valueCoder);
new WindmillKeyedWorkItem<>(
key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode());
final boolean isEmptyWorkItem =
(Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,22 @@ public final class Work implements RefreshableWork {
private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";
private final boolean drainMode;

private Work(
WorkItem workItem,
long serializedWorkItemSize,
Watermarks watermarks,
ProcessingContext processingContext,
boolean drainMode,
Supplier<Instant> clock) {
this.shardedKey = ShardedKey.create(workItem.getKey(), workItem.getShardingKey());
this.workItem = workItem;
this.serializedWorkItemSize = serializedWorkItemSize;
this.processingContext = processingContext;
this.watermarks = watermarks;
this.clock = clock;
this.drainMode = drainMode;
this.startTime = clock.get();
Preconditions.checkState(EMPTY_ENUM_MAP.isEmpty());
// Create by passing EMPTY_ENUM_MAP to avoid recreating
Expand All @@ -110,8 +113,10 @@ public static Work create(
long serializedWorkItemSize,
Watermarks watermarks,
ProcessingContext processingContext,
boolean drainMode,
Supplier<Instant> clock) {
return new Work(workItem, serializedWorkItemSize, watermarks, processingContext, clock);
return new Work(
workItem, serializedWorkItemSize, watermarks, processingContext, drainMode, clock);
}

public static ProcessingContext createProcessingContext(
Expand Down Expand Up @@ -207,6 +212,10 @@ public State getState() {
return currentState.state();
}

public boolean getDrainMode() {
return drainMode;
}

public void setState(State state) {
Instant now = clock.get();
totalDurationPerState.compute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ private void streamingEngineDispatchLoop(
(computationId,
inputDataWatermark,
synchronizedProcessingTime,
drainMode,
workItem,
serializedWorkItemSize,
getWorkStreamLatencies) ->
Expand All @@ -178,6 +179,7 @@ private void streamingEngineDispatchLoop(
getDataClient,
workCommitter::commit,
heartbeatSender),
drainMode,
getWorkStreamLatencies);
}));
try {
Expand Down Expand Up @@ -239,6 +241,7 @@ private void applianceDispatchLoop(Supplier<Windmill.GetWorkResponse> getWorkFn)
watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(),
Work.createProcessingContext(
computationId, getDataClient, workCommitter::commit, heartbeatSender),
computationWork.getDrainMode(),
/* getWorkStreamLatencies= */ ImmutableList.of());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,17 @@ private static ComputationMetadata fromProto(
metadataProto.getComputationId(),
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
WindmillTimeUtils.windmillToHarnessWatermark(
metadataProto.getDependentRealtimeInputWatermark()));
metadataProto.getDependentRealtimeInputWatermark()),
metadataProto.getDrainMode());
}

abstract String computationId();

abstract @Nullable Instant inputDataWatermark();

abstract @Nullable Instant synchronizedProcessingTime();

abstract boolean drainMode();
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
assembledWorkItem.bufferedSize(),
createWatermarks(workItem, metadata),
createProcessingContext(metadata.computationId()),
metadata.drainMode(),
assembledWorkItem.latencyAttributions());
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
GetWorkBudget extension = budgetTracker.computeBudgetExtension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
assembledWorkItem.computationMetadata().computationId(),
assembledWorkItem.computationMetadata().inputDataWatermark(),
assembledWorkItem.computationMetadata().synchronizedProcessingTime(),
assembledWorkItem.computationMetadata().drainMode(),
assembledWorkItem.workItem(),
assembledWorkItem.bufferedSize(),
assembledWorkItem.latencyAttributions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void receiveWork(
String computation,
@Nullable Instant inputDataWatermark,
@Nullable Instant synchronizedProcessingTime,
boolean drainMode,
Windmill.WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public interface WorkItemScheduler {
* @param workItem {@link WorkItem} to be processed.
* @param watermarks processing watermarks for the workItem.
* @param processingContext for processing the workItem.
* @param drainMode is job is draining.
* @param getWorkStreamLatencies Latencies per processing stage for the WorkItem for reporting
* back to Streaming Engine backend.
*/
Expand All @@ -43,5 +44,6 @@ void scheduleWork(
long serializedWorkItemSize,
Watermarks watermarks,
Work.ProcessingContext processingContext,
boolean drainMode,
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,12 @@ public void scheduleWork(
long serializedWorkItemSize,
Watermarks watermarks,
Work.ProcessingContext processingContext,
boolean drainMode,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
computationState.activateWork(
ExecutableWork.create(
Work.create(workItem, serializedWorkItemSize, watermarks, processingContext, clock),
Work.create(
workItem, serializedWorkItemSize, watermarks, processingContext, drainMode, clock),
work -> processWork(computationState, work, getWorkStreamLatencies)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep
computationWork.getComputationId(),
inputDataWatermark,
Instant.now(),
computationWork.getDrainMode(),
workItem,
workItem.getSerializedSize(),
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ private static ExecutableWork createMockWork(
Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
Work.createProcessingContext(
computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)),
false,
Instant::now),
processWorkFn);
}
Expand Down Expand Up @@ -3552,6 +3553,7 @@ public void testLatencyAttributionProtobufsPopulated() {
new FakeGetDataClient(),
ignored -> {},
mock(HeartbeatSender.class)),
false,
clock);

clock.sleep(Duration.millis(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private <T> WindowedValue<KeyedWorkItem<String, T>> createValue(
return new ValueInEmptyWindows<>(
(KeyedWorkItem<String, T>)
new WindmillKeyedWorkItem<>(
KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder));
KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder, false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private <T> WindowedValue<KeyedWorkItem<String, T>> createValue(
return new ValueInEmptyWindows<>(
(KeyedWorkItem<String, T>)
new WindmillKeyedWorkItem<>(
KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder));
KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder, false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ private static Work createMockWork(Windmill.WorkItem workItem, Watermarks waterm
watermarks,
Work.createProcessingContext(
COMPUTATION_ID, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.class)),
false,
Instant::now);
}

Expand Down
Loading
Loading