Skip to content

Commit dfe01c0

Browse files
authored
Add WindmillTagEncoding Interface (#37150)
1 parent 5d658c1 commit dfe01c0

15 files changed

+279
-178
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@
6161
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
6262
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
6363
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
64-
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
64+
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
65+
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
6566
import org.apache.beam.sdk.annotations.Internal;
6667
import org.apache.beam.sdk.coders.Coder;
6768
import org.apache.beam.sdk.io.UnboundedSource;
@@ -119,6 +120,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
119120
*/
120121
private final Map<TupleTag<?>, Map<BoundedWindow, SideInput<?>>> sideInputCache;
121122

123+
private final WindmillTagEncoding windmillTagEncoding;
122124
/**
123125
* The current user-facing key for this execution context.
124126
*
@@ -169,6 +171,7 @@ public StreamingModeExecutionContext(
169171
this.readerCache = readerCache;
170172
this.globalConfigHandle = globalConfigHandle;
171173
this.sideInputCache = new HashMap<>();
174+
this.windmillTagEncoding = WindmillTagEncodingV1.instance();
172175
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
173176
this.stateCache = stateCache;
174177
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
@@ -200,6 +203,10 @@ public boolean getDrainMode() {
200203
return work != null ? work.getDrainMode() : false;
201204
}
202205

206+
public WindmillTagEncoding getWindmillTagEncoding() {
207+
return windmillTagEncoding;
208+
}
209+
203210
public boolean offsetBasedDeduplicationSupported() {
204211
return activeReader != null
205212
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
@@ -778,7 +785,7 @@ public void start(
778785
stateReader,
779786
getWorkItem().getIsNewKey(),
780787
cacheForKey.forFamily(stateFamily),
781-
WindmillStateTagUtil.instance(),
788+
windmillTagEncoding,
782789
scopedReadStateSupplier);
783790

784791
this.systemTimerInternals =
@@ -787,7 +794,7 @@ public void start(
787794
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
788795
processingTime,
789796
watermarks,
790-
WindmillStateTagUtil.instance(),
797+
windmillTagEncoding,
791798
td -> {});
792799

793800
this.userTimerInternals =
@@ -796,7 +803,7 @@ public void start(
796803
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
797804
processingTime,
798805
watermarks,
799-
WindmillStateTagUtil.instance(),
806+
windmillTagEncoding,
800807
this::onUserTimerModified);
801808

802809
this.cachedFiredSystemTimers = null;
@@ -824,12 +831,11 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
824831
&& timer.getStateFamily().equals(stateFamily))
825832
.transform(
826833
timer ->
827-
WindmillStateTagUtil.instance()
828-
.windmillTimerToTimerData(
829-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
830-
timer,
831-
windowCoder,
832-
getDrainMode()))
834+
windmillTagEncoding.windmillTimerToTimerData(
835+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
836+
timer,
837+
windowCoder,
838+
getDrainMode()))
833839
.iterator();
834840
}
835841

@@ -888,12 +894,11 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
888894
&& timer.getStateFamily().equals(stateFamily))
889895
.transform(
890896
timer ->
891-
WindmillStateTagUtil.instance()
892-
.windmillTimerToTimerData(
893-
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
894-
timer,
895-
windowCoder,
896-
getDrainMode()))
897+
windmillTagEncoding.windmillTimerToTimerData(
898+
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
899+
timer,
900+
windowCoder,
901+
getDrainMode()))
897902
.iterator());
898903
}
899904

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.beam.runners.core.TimerInternals.TimerData;
3131
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
3232
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
33-
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
33+
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
3434
import org.apache.beam.sdk.coders.Coder;
3535
import org.apache.beam.sdk.coders.KvCoder;
3636
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -68,19 +68,22 @@ public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
6868
private final transient Coder<? extends BoundedWindow> windowCoder;
6969
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
7070
private final transient Coder<ElemT> valueCoder;
71+
private final WindmillTagEncoding windmillTagEncoding;
7172

7273
public WindmillKeyedWorkItem(
7374
K key,
7475
Windmill.WorkItem workItem,
7576
Coder<? extends BoundedWindow> windowCoder,
7677
Coder<Collection<? extends BoundedWindow>> windowsCoder,
7778
Coder<ElemT> valueCoder,
79+
WindmillTagEncoding windmillTagEncoding,
7880
boolean drainMode) {
7981
this.key = key;
8082
this.workItem = workItem;
8183
this.windowCoder = windowCoder;
8284
this.windowsCoder = windowsCoder;
8385
this.valueCoder = valueCoder;
86+
this.windmillTagEncoding = windmillTagEncoding;
8487
this.drainMode = drainMode;
8588
}
8689

@@ -98,12 +101,11 @@ public Iterable<TimerData> timersIterable() {
98101
.append(nonEventTimers)
99102
.transform(
100103
timer ->
101-
WindmillStateTagUtil.instance()
102-
.windmillTimerToTimerData(
103-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
104-
timer,
105-
windowCoder,
106-
drainMode));
104+
windmillTagEncoding.windmillTimerToTimerData(
105+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
106+
timer,
107+
windowCoder,
108+
drainMode));
107109
}
108110

109111
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
3030
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
3131
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
32-
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
32+
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
3333
import org.apache.beam.sdk.state.TimeDomain;
3434
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3535
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -61,26 +61,26 @@ class WindmillTimerInternals implements TimerInternals {
6161
private final String stateFamily;
6262
private final WindmillNamespacePrefix prefix;
6363
private final Consumer<TimerData> onTimerModified;
64-
private final WindmillStateTagUtil windmillStateTagUtil;
64+
private final WindmillTagEncoding windmillTagEncoding;
6565

6666
public WindmillTimerInternals(
6767
String stateFamily, // unique identifies a step
6868
WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s"
6969
Instant processingTime,
7070
Watermarks watermarks,
71-
WindmillStateTagUtil windmillStateTagUtil,
71+
WindmillTagEncoding windmillTagEncoding,
7272
Consumer<TimerData> onTimerModified) {
7373
this.watermarks = watermarks;
7474
this.processingTime = checkNotNull(processingTime);
7575
this.stateFamily = stateFamily;
7676
this.prefix = prefix;
77-
this.windmillStateTagUtil = windmillStateTagUtil;
77+
this.windmillTagEncoding = windmillTagEncoding;
7878
this.onTimerModified = onTimerModified;
7979
}
8080

8181
public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
8282
return new WindmillTimerInternals(
83-
stateFamily, prefix, processingTime, watermarks, windmillStateTagUtil, onTimerModified);
83+
stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified);
8484
}
8585

8686
@Override
@@ -187,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
187187
TimerData timerData = value.getKey();
188188

189189
Timer.Builder timer =
190-
windmillStateTagUtil.buildWindmillTimerFromTimerData(
190+
windmillTagEncoding.buildWindmillTimerFromTimerData(
191191
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());
192192

193193
if (value.getValue()) {
@@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
201201
// Setting a timer, clear any prior hold and set to the new value
202202
outputBuilder
203203
.addWatermarkHoldsBuilder()
204-
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
204+
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
205205
.setStateFamily(stateFamily)
206206
.setReset(true)
207207
.addTimestamps(
@@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
210210
// Clear the hold in case a previous iteration of this timer set one.
211211
outputBuilder
212212
.addWatermarkHoldsBuilder()
213-
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
213+
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
214214
.setStateFamily(stateFamily)
215215
.setReset(true);
216216
}
@@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
225225
// We are deleting timer; clear the hold
226226
outputBuilder
227227
.addWatermarkHoldsBuilder()
228-
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
228+
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
229229
.setStateFamily(stateFamily)
230230
.setReset(true);
231231
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,13 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
120120
final WorkItem workItem = context.getWorkItem();
121121
KeyedWorkItem<K, T> keyedWorkItem =
122122
new WindmillKeyedWorkItem<>(
123-
key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode());
123+
key,
124+
workItem,
125+
windowCoder,
126+
windowsCoder,
127+
valueCoder,
128+
context.getWindmillTagEncoding(),
129+
context.getDrainMode());
124130
final boolean isEmptyWorkItem =
125131
(Iterables.isEmpty(keyedWorkItem.timersIterable())
126132
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ final class CachingStateTable {
4848
private final @Nullable CachingStateTable derivedStateTable;
4949
private final boolean isNewKey;
5050
private final boolean mapStateViaMultimapState;
51-
private final WindmillStateTagUtil windmillStateTagUtil;
51+
private final WindmillTagEncoding windmillTagEncoding;
5252

5353
private CachingStateTable(Builder builder) {
5454
this.stateTable = new HashMap<>();
@@ -60,7 +60,7 @@ private CachingStateTable(Builder builder) {
6060
this.scopedReadStateSupplier = builder.scopedReadStateSupplier;
6161
this.derivedStateTable = builder.derivedStateTable;
6262
this.mapStateViaMultimapState = builder.mapStateViaMultimapState;
63-
this.windmillStateTagUtil = builder.windmillStateTagUtil;
63+
this.windmillTagEncoding = builder.windmillTagEncoding;
6464
if (this.isSystemTable) {
6565
Preconditions.checkState(derivedStateTable == null);
6666
} else {
@@ -74,9 +74,9 @@ static Builder builder(
7474
ForKeyAndFamily cache,
7575
boolean isNewKey,
7676
Supplier<Closeable> scopedReadStateSupplier,
77-
WindmillStateTagUtil windmillStateTagUtil) {
77+
WindmillTagEncoding windmillTagEncoding) {
7878
return new Builder(
79-
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillStateTagUtil);
79+
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillTagEncoding);
8080
}
8181

8282
/**
@@ -114,7 +114,7 @@ private StateTag.StateBinder binderForNamespace(StateNamespace namespace, StateC
114114
public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
115115
StateTag<BagState<T>> resolvedAddress =
116116
isSystemTable ? StateTags.makeSystemTagInternal(address) : address;
117-
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, resolvedAddress);
117+
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, resolvedAddress);
118118

119119
@Nullable WindmillBag<T> bag = (WindmillBag<T>) cache.get(namespace, encodedKey);
120120
if (bag == null) {
@@ -144,7 +144,7 @@ public <KeyT, ValueT> AbstractWindmillMap<KeyT, ValueT> bindMap(
144144
new WindmillMapViaMultimap<>(
145145
bindMultimap(internalMultimapAddress, keyCoder, valueCoder));
146146
} else {
147-
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
147+
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec);
148148
result = (AbstractWindmillMap<KeyT, ValueT>) cache.get(namespace, encodedKey);
149149
if (result == null) {
150150
result =
@@ -161,7 +161,7 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
161161
StateTag<MultimapState<KeyT, ValueT>> spec,
162162
Coder<KeyT> keyCoder,
163163
Coder<ValueT> valueCoder) {
164-
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
164+
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec);
165165
WindmillMultimap<KeyT, ValueT> result =
166166
(WindmillMultimap<KeyT, ValueT>) cache.get(namespace, encodedKey);
167167
if (result == null) {
@@ -177,8 +177,7 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
177177
public <T> OrderedListState<T> bindOrderedList(
178178
StateTag<OrderedListState<T>> spec, Coder<T> elemCoder) {
179179
StateTag<OrderedListState<T>> specOrInternalTag = addressOrInternalTag(spec);
180-
InternedByteString encodedKey =
181-
windmillStateTagUtil.encodeKey(namespace, specOrInternalTag);
180+
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, specOrInternalTag);
182181

183182
WindmillOrderedList<T> result = (WindmillOrderedList<T>) cache.get(namespace, encodedKey);
184183
if (result == null) {
@@ -202,7 +201,7 @@ public WatermarkHoldState bindWatermark(
202201
StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
203202
StateTag<WatermarkHoldState> addressOrInternalTag = addressOrInternalTag(address);
204203
InternedByteString encodedKey =
205-
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);
204+
windmillTagEncoding.stateTag(namespace, addressOrInternalTag);
206205

207206
WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, encodedKey);
208207
if (result == null) {
@@ -231,7 +230,7 @@ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCom
231230
combineFn,
232231
cache,
233232
isNewKey,
234-
windmillStateTagUtil);
233+
windmillTagEncoding);
235234

236235
result.initializeForWorkItem(reader, scopedReadStateSupplier);
237236
return result;
@@ -251,7 +250,7 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
251250
public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
252251
StateTag<ValueState<T>> addressOrInternalTag = addressOrInternalTag(address);
253252
InternedByteString encodedKey =
254-
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);
253+
windmillTagEncoding.stateTag(namespace, addressOrInternalTag);
255254

256255
WindmillValue<T> result = (WindmillValue<T>) cache.get(namespace, encodedKey);
257256
if (result == null) {
@@ -289,7 +288,7 @@ static class Builder {
289288
private final WindmillStateCache.ForKeyAndFamily cache;
290289
private final Supplier<Closeable> scopedReadStateSupplier;
291290
private final boolean isNewKey;
292-
private final WindmillStateTagUtil windmillStateTagUtil;
291+
private final WindmillTagEncoding windmillTagEncoding;
293292
private boolean isSystemTable;
294293
private @Nullable CachingStateTable derivedStateTable;
295294
private boolean mapStateViaMultimapState = false;
@@ -300,15 +299,15 @@ private Builder(
300299
ForKeyAndFamily cache,
301300
Supplier<Closeable> scopedReadStateSupplier,
302301
boolean isNewKey,
303-
WindmillStateTagUtil windmillStateTagUtil) {
302+
WindmillTagEncoding windmillTagEncoding) {
304303
this.stateFamily = stateFamily;
305304
this.reader = reader;
306305
this.cache = cache;
307306
this.scopedReadStateSupplier = scopedReadStateSupplier;
308307
this.isNewKey = isNewKey;
309308
this.isSystemTable = true;
310309
this.derivedStateTable = null;
311-
this.windmillStateTagUtil = windmillStateTagUtil;
310+
this.windmillTagEncoding = windmillTagEncoding;
312311
}
313312

314313
Builder withDerivedState(CachingStateTable derivedStateTable) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class WindmillCombiningState<InputT, AccumT, OutputT> extends WindmillState
6060
CombineFn<InputT, AccumT, OutputT> combineFn,
6161
ForKeyAndFamily cache,
6262
boolean isNewKey,
63-
WindmillStateTagUtil windmillStateTagUtil) {
63+
WindmillTagEncoding windmillTagEncoding) {
6464
StateTag<BagState<AccumT>> internalBagAddress = StateTags.convertToBagTagInternal(address);
65-
InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, internalBagAddress);
65+
InternedByteString encodeKey = windmillTagEncoding.stateTag(namespace, internalBagAddress);
6666

6767
WindmillBag<AccumT> bag = (WindmillBag<AccumT>) cache.get(namespace, encodeKey);
6868
if (bag == null) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ public WindmillStateInternals(
6161
WindmillStateReader reader,
6262
boolean isNewKey,
6363
WindmillStateCache.ForKeyAndFamily cache,
64-
WindmillStateTagUtil windmillStateTagUtil,
64+
WindmillTagEncoding windmillTagEncoding,
6565
Supplier<Closeable> scopedReadStateSupplier) {
6666
this.key = key;
6767
this.cache = cache;
6868
this.scopedReadStateSupplier = scopedReadStateSupplier;
6969
CachingStateTable.Builder builder =
7070
CachingStateTable.builder(
71-
stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillStateTagUtil);
71+
stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillTagEncoding);
7272
if (cache.supportMapStateViaMultimapState()) {
7373
builder = builder.withMapStateViaMultimapState();
7474
}

0 commit comments

Comments
 (0)