Skip to content

Commit 204175b

Browse files
committed
propagate drainmode up to timerData creation in windmillTimerToTimerData
1 parent 09f4963 commit 204175b

33 files changed

+120
-22
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
389389
serializedWorkItemSize,
390390
watermarks,
391391
processingContext,
392+
drainMode,
392393
getWorkStreamLatencies) ->
393394
computationStateCache
394395
.get(processingContext.computationId())
@@ -401,6 +402,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
401402
serializedWorkItemSize,
402403
watermarks,
403404
processingContext,
405+
drainMode,
404406
getWorkStreamLatencies);
405407
}),
406408
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ public boolean workIsFailed() {
196196
return work != null && work.isFailed();
197197
}
198198

199+
public boolean getDrainMode() {
200+
return work != null ? work.getDrainMode() : false;
201+
}
202+
199203
public boolean offsetBasedDeduplicationSupported() {
200204
return activeReader != null
201205
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
@@ -820,7 +824,10 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
820824
.transform(
821825
timer ->
822826
WindmillTimerInternals.windmillTimerToTimerData(
823-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder))
827+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
828+
timer,
829+
windowCoder,
830+
getDrainMode()))
824831
.iterator();
825832
}
826833

@@ -880,7 +887,10 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
880887
.transform(
881888
timer ->
882889
WindmillTimerInternals.windmillTimerToTimerData(
883-
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
890+
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
891+
timer,
892+
windowCoder,
893+
getDrainMode()))
884894
.iterator());
885895
}
886896

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.InputStream;
2525
import java.util.Collection;
2626
import java.util.Map;
27+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
2728
import org.apache.beam.runners.dataflow.util.CloudObject;
2829
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
2930
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -117,8 +118,14 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
117118
Collection<? extends BoundedWindow> windows =
118119
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
119120
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());
121+
boolean drainingValueFromUpstream = false;
120122
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
121-
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
123+
BeamFnApi.Elements.ElementMetadata elementMetadata =
124+
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
125+
drainingValueFromUpstream =
126+
elementMetadata.hasDrain()
127+
? (elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING)
128+
: false;
122129
}
123130
if (valueCoder instanceof KvCoder) {
124131
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -129,11 +136,19 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
129136
T result =
130137
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
131138
// todo #33176 propagate metadata to windowed value
132-
return WindowedValues.of(result, timestampMillis, windows, paneInfo);
139+
return WindowedValues.of(
140+
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
133141
} else {
134142
notifyElementRead(data.available() + metadata.available());
135143
// todo #33176 propagate metadata to windowed value
136-
return WindowedValues.of(decode(valueCoder, data), timestampMillis, windows, paneInfo);
144+
return WindowedValues.of(
145+
decode(valueCoder, data),
146+
timestampMillis,
147+
windows,
148+
paneInfo,
149+
null,
150+
null,
151+
drainingValueFromUpstream);
137152
}
138153
}
139154

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collection;
2525
import java.util.List;
2626
import java.util.Objects;
27+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
2728
import org.apache.beam.runners.core.KeyedWorkItem;
2829
import org.apache.beam.runners.core.KeyedWorkItemCoder;
2930
import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -60,6 +61,7 @@ public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
6061

6162
private final Windmill.WorkItem workItem;
6263
private final K key;
64+
private final boolean drainMode;
6365

6466
private final transient Coder<? extends BoundedWindow> windowCoder;
6567
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -70,12 +72,14 @@ public WindmillKeyedWorkItem(
7072
Windmill.WorkItem workItem,
7173
Coder<? extends BoundedWindow> windowCoder,
7274
Coder<Collection<? extends BoundedWindow>> windowsCoder,
73-
Coder<ElemT> valueCoder) {
75+
Coder<ElemT> valueCoder,
76+
boolean drainMode) {
7477
this.key = key;
7578
this.workItem = workItem;
7679
this.windowCoder = windowCoder;
7780
this.windowsCoder = windowsCoder;
7881
this.valueCoder = valueCoder;
82+
this.drainMode = drainMode;
7983
}
8084

8185
@Override
@@ -93,7 +97,10 @@ public Iterable<TimerData> timersIterable() {
9397
.transform(
9498
timer ->
9599
WindmillTimerInternals.windmillTimerToTimerData(
96-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder));
100+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
101+
timer,
102+
windowCoder,
103+
drainMode));
97104
}
98105

99106
@Override
@@ -108,13 +115,22 @@ public Iterable<WindowedValue<ElemT>> elementsIterable() {
108115
Collection<? extends BoundedWindow> windows =
109116
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
110117
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());
118+
// Draining value is based on upstream data
119+
boolean drainingValueFromUpstream = false;
111120
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
112-
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
121+
BeamFnApi.Elements.ElementMetadata elementMetadata =
122+
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
123+
drainingValueFromUpstream =
124+
elementMetadata.hasDrain()
125+
? (elementMetadata.getDrain()
126+
== BeamFnApi.Elements.DrainMode.Enum.DRAINING)
127+
: false;
113128
}
114129
InputStream inputStream = message.getData().newInput();
115130
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
116131
// todo #33176 specify additional metadata in the future
117-
return WindowedValues.of(value, timestamp, windows, paneInfo);
132+
return WindowedValues.of(
133+
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
118134
} catch (IOException e) {
119135
throw new RuntimeException(e);
120136
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,10 @@ static Timer timerDataToWindmillTimer(
301301
}
302302

303303
public static TimerData windmillTimerToTimerData(
304-
WindmillNamespacePrefix prefix, Timer timer, Coder<? extends BoundedWindow> windowCoder) {
304+
WindmillNamespacePrefix prefix,
305+
Timer timer,
306+
Coder<? extends BoundedWindow> windowCoder,
307+
boolean draining) {
305308

306309
// The tag is a path-structure string but cheaper to parse than a proper URI. It follows
307310
// this pattern, where no component but the ID can contain a slash
@@ -395,6 +398,7 @@ public static TimerData windmillTimerToTimerData(
395398
timestamp,
396399
outputTimestamp,
397400
timerTypeToTimeDomain(timer.getType()));
401+
// todo add draining
398402
}
399403

400404
private static boolean useNewTimerTagEncoding(TimerData timerData) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
119119
final K key = keyCoder.decode(context.getSerializedKey().newInput(), Coder.Context.OUTER);
120120
final WorkItem workItem = context.getWorkItem();
121121
KeyedWorkItem<K, T> keyedWorkItem =
122-
new WindmillKeyedWorkItem<>(key, workItem, windowCoder, windowsCoder, valueCoder);
122+
new WindmillKeyedWorkItem<>(
123+
key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode());
123124
final boolean isEmptyWorkItem =
124125
(Iterables.isEmpty(keyedWorkItem.timersIterable())
125126
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,22 @@ public final class Work implements RefreshableWork {
7878
private volatile TimedState currentState;
7979
private volatile boolean isFailed;
8080
private volatile String processingThreadName = "";
81+
private final boolean drainMode;
8182

8283
private Work(
8384
WorkItem workItem,
8485
long serializedWorkItemSize,
8586
Watermarks watermarks,
8687
ProcessingContext processingContext,
88+
boolean drainMode,
8789
Supplier<Instant> clock) {
8890
this.shardedKey = ShardedKey.create(workItem.getKey(), workItem.getShardingKey());
8991
this.workItem = workItem;
9092
this.serializedWorkItemSize = serializedWorkItemSize;
9193
this.processingContext = processingContext;
9294
this.watermarks = watermarks;
9395
this.clock = clock;
96+
this.drainMode = drainMode;
9497
this.startTime = clock.get();
9598
Preconditions.checkState(EMPTY_ENUM_MAP.isEmpty());
9699
// Create by passing EMPTY_ENUM_MAP to avoid recreating
@@ -110,8 +113,10 @@ public static Work create(
110113
long serializedWorkItemSize,
111114
Watermarks watermarks,
112115
ProcessingContext processingContext,
116+
boolean drainMode,
113117
Supplier<Instant> clock) {
114-
return new Work(workItem, serializedWorkItemSize, watermarks, processingContext, clock);
118+
return new Work(
119+
workItem, serializedWorkItemSize, watermarks, processingContext, drainMode, clock);
115120
}
116121

117122
public static ProcessingContext createProcessingContext(
@@ -207,6 +212,10 @@ public State getState() {
207212
return currentState.state();
208213
}
209214

215+
public boolean getDrainMode() {
216+
return drainMode;
217+
}
218+
210219
public void setState(State state) {
211220
Instant now = clock.get();
212221
totalDurationPerState.compute(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ private void streamingEngineDispatchLoop(
155155
(computationId,
156156
inputDataWatermark,
157157
synchronizedProcessingTime,
158+
drainMode,
158159
workItem,
159160
serializedWorkItemSize,
160161
getWorkStreamLatencies) ->
@@ -178,6 +179,7 @@ private void streamingEngineDispatchLoop(
178179
getDataClient,
179180
workCommitter::commit,
180181
heartbeatSender),
182+
drainMode,
181183
getWorkStreamLatencies);
182184
}));
183185
try {
@@ -239,6 +241,7 @@ private void applianceDispatchLoop(Supplier<Windmill.GetWorkResponse> getWorkFn)
239241
watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(),
240242
Work.createProcessingContext(
241243
computationId, getDataClient, workCommitter::commit, heartbeatSender),
244+
computationWork.getDrainMode(),
242245
/* getWorkStreamLatencies= */ ImmutableList.of());
243246
}
244247
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,17 @@ private static ComputationMetadata fromProto(
124124
metadataProto.getComputationId(),
125125
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
126126
WindmillTimeUtils.windmillToHarnessWatermark(
127-
metadataProto.getDependentRealtimeInputWatermark()));
127+
metadataProto.getDependentRealtimeInputWatermark()),
128+
metadataProto.getDrainMode());
128129
}
129130

130131
abstract String computationId();
131132

132133
abstract @Nullable Instant inputDataWatermark();
133134

134135
abstract @Nullable Instant synchronizedProcessingTime();
136+
137+
abstract boolean drainMode();
135138
}
136139

137140
@AutoValue

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
281281
assembledWorkItem.bufferedSize(),
282282
createWatermarks(workItem, metadata),
283283
createProcessingContext(metadata.computationId()),
284+
metadata.drainMode(),
284285
assembledWorkItem.latencyAttributions());
285286
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
286287
GetWorkBudget extension = budgetTracker.computeBudgetExtension();

0 commit comments

Comments
 (0)