diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index c8ff7840bd1d..d734d629711a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -61,7 +61,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.UnboundedSource; @@ -119,6 +120,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext, Map>> sideInputCache; + private final WindmillTagEncoding windmillTagEncoding; /** * The current user-facing key for this execution context. * @@ -169,6 +171,7 @@ public StreamingModeExecutionContext( this.readerCache = readerCache; this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); + this.windmillTagEncoding = WindmillTagEncodingV1.instance(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; @@ -200,6 +203,10 @@ public boolean getDrainMode() { return work != null ? work.getDrainMode() : false; } + public WindmillTagEncoding getWindmillTagEncoding() { + return windmillTagEncoding; + } + public boolean offsetBasedDeduplicationSupported() { return activeReader != null && activeReader.getCurrentSource().offsetBasedDeduplicationSupported(); @@ -777,7 +784,7 @@ public void start( stateReader, getWorkItem().getIsNewKey(), cacheForKey.forFamily(stateFamily), - WindmillStateTagUtil.instance(), + windmillTagEncoding, scopedReadStateSupplier); this.systemTimerInternals = @@ -786,7 +793,7 @@ public void start( WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, processingTime, watermarks, - WindmillStateTagUtil.instance(), + windmillTagEncoding, td -> {}); this.userTimerInternals = @@ -795,7 +802,7 @@ public void start( WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, processingTime, watermarks, - WindmillStateTagUtil.instance(), + windmillTagEncoding, this::onUserTimerModified); this.cachedFiredSystemTimers = null; @@ -823,12 +830,11 @@ public TimerData getNextFiredTimer(Coder windowCode && timer.getStateFamily().equals(stateFamily)) .transform( timer -> - WindmillStateTagUtil.instance() - .windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + windmillTagEncoding.windmillTimerToTimerData( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + timer, + windowCoder, + getDrainMode())) .iterator(); } @@ -887,12 +893,11 @@ public TimerData getNextFiredUserTimer(Coder window && timer.getStateFamily().equals(stateFamily)) .transform( timer -> - WindmillStateTagUtil.instance() - .windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + windmillTagEncoding.windmillTimerToTimerData( + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + timer, + windowCoder, + getDrainMode())) .iterator()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index ad5f2b0dd40a..1f99d929898c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -30,7 +30,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -68,6 +68,7 @@ public class WindmillKeyedWorkItem implements KeyedWorkItem private final transient Coder windowCoder; private final transient Coder> windowsCoder; private final transient Coder valueCoder; + private final WindmillTagEncoding windmillTagEncoding; public WindmillKeyedWorkItem( K key, @@ -75,12 +76,14 @@ public WindmillKeyedWorkItem( Coder windowCoder, Coder> windowsCoder, Coder valueCoder, + WindmillTagEncoding windmillTagEncoding, boolean drainMode) { this.key = key; this.workItem = workItem; this.windowCoder = windowCoder; this.windowsCoder = windowsCoder; this.valueCoder = valueCoder; + this.windmillTagEncoding = windmillTagEncoding; this.drainMode = drainMode; } @@ -98,12 +101,11 @@ public Iterable timersIterable() { .append(nonEventTimers) .transform( timer -> - WindmillStateTagUtil.instance() - .windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - drainMode)); + windmillTagEncoding.windmillTimerToTimerData( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + timer, + windowCoder, + drainMode)); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index e1d89fc10a17..cb41aa1ccab4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -29,7 +29,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -61,26 +61,26 @@ class WindmillTimerInternals implements TimerInternals { private final String stateFamily; private final WindmillNamespacePrefix prefix; private final Consumer onTimerModified; - private final WindmillStateTagUtil windmillStateTagUtil; + private final WindmillTagEncoding windmillTagEncoding; public WindmillTimerInternals( String stateFamily, // unique identifies a step WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s" Instant processingTime, Watermarks watermarks, - WindmillStateTagUtil windmillStateTagUtil, + WindmillTagEncoding windmillTagEncoding, Consumer onTimerModified) { this.watermarks = watermarks; this.processingTime = checkNotNull(processingTime); this.stateFamily = stateFamily; this.prefix = prefix; - this.windmillStateTagUtil = windmillStateTagUtil; + this.windmillTagEncoding = windmillTagEncoding; this.onTimerModified = onTimerModified; } public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) { return new WindmillTimerInternals( - stateFamily, prefix, processingTime, watermarks, windmillStateTagUtil, onTimerModified); + stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified); } @Override @@ -187,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { TimerData timerData = value.getKey(); Timer.Builder timer = - windmillStateTagUtil.buildWindmillTimerFromTimerData( + windmillTagEncoding.buildWindmillTimerFromTimerData( stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder()); if (value.getValue()) { @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Setting a timer, clear any prior hold and set to the new value outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) .setStateFamily(stateFamily) .setReset(true) .addTimestamps( @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // Clear the hold in case a previous iteration of this timer set one. outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) .setStateFamily(stateFamily) .setReset(true); } @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { // We are deleting timer; clear the hold outputBuilder .addWatermarkHoldsBuilder() - .setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData)) + .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData)) .setStateFamily(stateFamily) .setReset(true); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java index f4a6eec61cbf..7dd55d91211d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java @@ -120,7 +120,13 @@ public NativeReaderIterator>> iterator() throw final WorkItem workItem = context.getWorkItem(); KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( - key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode()); + key, + workItem, + windowCoder, + windowsCoder, + valueCoder, + context.getWindmillTagEncoding(), + context.getDrainMode()); final boolean isEmptyWorkItem = (Iterables.isEmpty(keyedWorkItem.timersIterable()) && Iterables.isEmpty(keyedWorkItem.elementsIterable())); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java index 3ea1fa876263..5144089f9ef6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/CachingStateTable.java @@ -48,7 +48,7 @@ final class CachingStateTable { private final @Nullable CachingStateTable derivedStateTable; private final boolean isNewKey; private final boolean mapStateViaMultimapState; - private final WindmillStateTagUtil windmillStateTagUtil; + private final WindmillTagEncoding windmillTagEncoding; private CachingStateTable(Builder builder) { this.stateTable = new HashMap<>(); @@ -60,7 +60,7 @@ private CachingStateTable(Builder builder) { this.scopedReadStateSupplier = builder.scopedReadStateSupplier; this.derivedStateTable = builder.derivedStateTable; this.mapStateViaMultimapState = builder.mapStateViaMultimapState; - this.windmillStateTagUtil = builder.windmillStateTagUtil; + this.windmillTagEncoding = builder.windmillTagEncoding; if (this.isSystemTable) { Preconditions.checkState(derivedStateTable == null); } else { @@ -74,9 +74,9 @@ static Builder builder( ForKeyAndFamily cache, boolean isNewKey, Supplier scopedReadStateSupplier, - WindmillStateTagUtil windmillStateTagUtil) { + WindmillTagEncoding windmillTagEncoding) { return new Builder( - stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillStateTagUtil); + stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillTagEncoding); } /** @@ -114,7 +114,7 @@ private StateTag.StateBinder binderForNamespace(StateNamespace namespace, StateC public BagState bindBag(StateTag> address, Coder elemCoder) { StateTag> resolvedAddress = isSystemTable ? StateTags.makeSystemTagInternal(address) : address; - InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, resolvedAddress); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, resolvedAddress); @Nullable WindmillBag bag = (WindmillBag) cache.get(namespace, encodedKey); if (bag == null) { @@ -144,7 +144,7 @@ public AbstractWindmillMap bindMap( new WindmillMapViaMultimap<>( bindMultimap(internalMultimapAddress, keyCoder, valueCoder)); } else { - InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec); result = (AbstractWindmillMap) cache.get(namespace, encodedKey); if (result == null) { result = @@ -161,7 +161,7 @@ public WindmillMultimap bindMultimap( StateTag> spec, Coder keyCoder, Coder valueCoder) { - InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec); WindmillMultimap result = (WindmillMultimap) cache.get(namespace, encodedKey); if (result == null) { @@ -177,8 +177,7 @@ public WindmillMultimap bindMultimap( public OrderedListState bindOrderedList( StateTag> spec, Coder elemCoder) { StateTag> specOrInternalTag = addressOrInternalTag(spec); - InternedByteString encodedKey = - windmillStateTagUtil.encodeKey(namespace, specOrInternalTag); + InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, specOrInternalTag); WindmillOrderedList result = (WindmillOrderedList) cache.get(namespace, encodedKey); if (result == null) { @@ -202,7 +201,7 @@ public WatermarkHoldState bindWatermark( StateTag address, TimestampCombiner timestampCombiner) { StateTag addressOrInternalTag = addressOrInternalTag(address); InternedByteString encodedKey = - windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag); + windmillTagEncoding.stateTag(namespace, addressOrInternalTag); WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, encodedKey); if (result == null) { @@ -231,7 +230,7 @@ public CombiningState bindCom combineFn, cache, isNewKey, - windmillStateTagUtil); + windmillTagEncoding); result.initializeForWorkItem(reader, scopedReadStateSupplier); return result; @@ -251,7 +250,7 @@ CombiningState bindCombiningValueWithContext( public ValueState bindValue(StateTag> address, Coder coder) { StateTag> addressOrInternalTag = addressOrInternalTag(address); InternedByteString encodedKey = - windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag); + windmillTagEncoding.stateTag(namespace, addressOrInternalTag); WindmillValue result = (WindmillValue) cache.get(namespace, encodedKey); if (result == null) { @@ -289,7 +288,7 @@ static class Builder { private final WindmillStateCache.ForKeyAndFamily cache; private final Supplier scopedReadStateSupplier; private final boolean isNewKey; - private final WindmillStateTagUtil windmillStateTagUtil; + private final WindmillTagEncoding windmillTagEncoding; private boolean isSystemTable; private @Nullable CachingStateTable derivedStateTable; private boolean mapStateViaMultimapState = false; @@ -300,7 +299,7 @@ private Builder( ForKeyAndFamily cache, Supplier scopedReadStateSupplier, boolean isNewKey, - WindmillStateTagUtil windmillStateTagUtil) { + WindmillTagEncoding windmillTagEncoding) { this.stateFamily = stateFamily; this.reader = reader; this.cache = cache; @@ -308,7 +307,7 @@ private Builder( this.isNewKey = isNewKey; this.isSystemTable = true; this.derivedStateTable = null; - this.windmillStateTagUtil = windmillStateTagUtil; + this.windmillTagEncoding = windmillTagEncoding; } Builder withDerivedState(CachingStateTable derivedStateTable) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java index 9ed31f250389..3da3ed7fad1d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillCombiningState.java @@ -60,9 +60,9 @@ class WindmillCombiningState extends WindmillState CombineFn combineFn, ForKeyAndFamily cache, boolean isNewKey, - WindmillStateTagUtil windmillStateTagUtil) { + WindmillTagEncoding windmillTagEncoding) { StateTag> internalBagAddress = StateTags.convertToBagTagInternal(address); - InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, internalBagAddress); + InternedByteString encodeKey = windmillTagEncoding.stateTag(namespace, internalBagAddress); WindmillBag bag = (WindmillBag) cache.get(namespace, encodeKey); if (bag == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java index ecf64c1fc84f..db036bee43c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java @@ -61,14 +61,14 @@ public WindmillStateInternals( WindmillStateReader reader, boolean isNewKey, WindmillStateCache.ForKeyAndFamily cache, - WindmillStateTagUtil windmillStateTagUtil, + WindmillTagEncoding windmillTagEncoding, Supplier scopedReadStateSupplier) { this.key = key; this.cache = cache; this.scopedReadStateSupplier = scopedReadStateSupplier; CachingStateTable.Builder builder = CachingStateTable.builder( - stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillStateTagUtil); + stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillTagEncoding); if (cache.supportMapStateViaMultimapState()) { builder = builder.withMapStateViaMultimapState(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java new file mode 100644 index 000000000000..59841f67347d --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.state; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; + +@Internal +@ThreadSafe +/* + * Windmill StateTag, TimerTag encoding interface + */ +public abstract class WindmillTagEncoding { + + protected static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); + + protected static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = + BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); + + /** Encodes state tag */ + public abstract InternedByteString stateTag(StateNamespace namespace, StateTag address); + + /** + * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark + * hold that is only freed after the timer fires. + */ + public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData); + + /** + * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and + * timestamp. + * + *

This is necessary because Windmill will deduplicate based only on this tag. + */ + public abstract ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData); + + /** Converts Windmill Timer to beam TimerData */ + public abstract TimerData windmillTimerToTimerData( + WindmillNamespacePrefix prefix, + Timer timer, + Coder windowCoder, + boolean draining); + + /** + * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. + * + * @return the input builder for chaining + */ + public Timer.Builder buildWindmillTimerFromTimerData( + @Nullable String stateFamily, + WindmillNamespacePrefix prefix, + TimerData timerData, + Timer.Builder builder) { + + builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); + + if (stateFamily != null) { + builder.setStateFamily(stateFamily); + } + + builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); + + // Store the output timestamp in the metadata timestamp. + Instant outputTimestamp = timerData.getOutputTimestamp(); + if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of + // the global window + // here instead. + outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; + } + builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); + return builder; + } + + protected static Timer.Type timerType(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return Timer.Type.WATERMARK; + case PROCESSING_TIME: + return Timer.Type.REALTIME; + case SYNCHRONIZED_PROCESSING_TIME: + return Timer.Type.DEPENDENT_REALTIME; + default: + throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain); + } + } + + protected static TimeDomain timerTypeToTimeDomain(Timer.Type type) { + switch (type) { + case REALTIME: + return TimeDomain.PROCESSING_TIME; + case DEPENDENT_REALTIME: + return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + case WATERMARK: + return TimeDomain.EVENT_TIME; + default: + throw new IllegalArgumentException("Unsupported timer type " + type); + } + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java similarity index 73% rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java rename to runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java index 6f6ea02938ed..19e31351a52b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java @@ -30,42 +30,28 @@ import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.util.VarInt; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; import org.joda.time.Instant; @Internal @ThreadSafe -public class WindmillStateTagUtil { - private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = - GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); - - private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = - BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); +public class WindmillTagEncodingV1 extends WindmillTagEncoding { private static final String TIMER_HOLD_PREFIX = "/h"; - private static final WindmillStateTagUtil INSTANCE = new WindmillStateTagUtil(); + private static final WindmillTagEncodingV1 INSTANCE = new WindmillTagEncodingV1(); // Private constructor to prevent instantiations from outside. - private WindmillStateTagUtil() {} + private WindmillTagEncodingV1() {} - /** - * Encodes the given namespace and address as {@code <namespace>+<address>}. The - * returned InternedByteStrings are weakly interned to reduce memory usage and reduce GC pressure. - */ - @VisibleForTesting - InternedByteString encodeKey(StateNamespace namespace, StateTag address) { + /** {@inheritDoc} */ + @Override + public InternedByteString stateTag(StateNamespace namespace, StateTag address) { try (StreamHandle streamHandle = ThreadLocalByteStringOutputStream.acquire()) { // Use ByteStringOutputStream rather than concatenation and String.format. We build these keys // a lot, and this leads to better performance results. See associated benchmarks. @@ -82,10 +68,8 @@ InternedByteString encodeKey(StateNamespace namespace, StateTag address) { } } - /** - * Produce a state tag that is guaranteed to be unique for the given timer, to add a watermark - * hold that is only freed after the timer fires. - */ + /** {@inheritDoc} */ + @Override public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerData) { String tagString; if ("".equals(timerData.getTimerFamilyId())) { @@ -118,12 +102,8 @@ public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerDa return ByteString.copyFromUtf8(tagString); } - /** - * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and - * timestamp. - * - *

This is necessary because Windmill will deduplicate based only on this tag. - */ + /** {@inheritDoc} */ + @Override public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { String tagString; if (useNewTimerTagEncoding(timerData)) { @@ -151,6 +131,8 @@ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) return ByteString.copyFromUtf8(tagString); } + /** {@inheritDoc} */ + @Override public TimerData windmillTimerToTimerData( WindmillNamespacePrefix prefix, Timer timer, @@ -253,69 +235,12 @@ public TimerData windmillTimerToTimerData( } - /** - * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. - * - * @return the input builder for chaining - */ - public Timer.Builder buildWindmillTimerFromTimerData( - @Nullable String stateFamily, - WindmillNamespacePrefix prefix, - TimerData timerData, - Timer.Builder builder) { - - builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); - - if (stateFamily != null) { - builder.setStateFamily(stateFamily); - } - - builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); - - // Store the output timestamp in the metadata timestamp. - Instant outputTimestamp = timerData.getOutputTimestamp(); - if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of - // the global window - // here instead. - outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; - } - builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); - return builder; - } - - private static Timer.Type timerType(TimeDomain domain) { - switch (domain) { - case EVENT_TIME: - return Timer.Type.WATERMARK; - case PROCESSING_TIME: - return Timer.Type.REALTIME; - case SYNCHRONIZED_PROCESSING_TIME: - return Timer.Type.DEPENDENT_REALTIME; - default: - throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain); - } - } - - private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) { - switch (type) { - case REALTIME: - return TimeDomain.PROCESSING_TIME; - case DEPENDENT_REALTIME: - return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; - case WATERMARK: - return TimeDomain.EVENT_TIME; - default: - throw new IllegalArgumentException("Unsupported timer type " + type); - } - } - private static boolean useNewTimerTagEncoding(TimerData timerData) { return !timerData.getTimerFamilyId().isEmpty(); } /** @return the singleton WindmillStateTagUtil */ - public static WindmillStateTagUtil instance() { + public static WindmillTagEncodingV1 instance() { return INSTANCE; } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 1ae9678eed60..094623b81311 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -50,7 +50,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.InputMessageBundle; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -149,7 +149,7 @@ private void addTimer( .getTimersBuilder() .addTimersBuilder() .setTag( - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .timerTag( WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, TimerData.of( @@ -196,7 +196,13 @@ private WindowedValue> createValue( return new ValueInEmptyWindows<>( (KeyedWorkItem) new WindmillKeyedWorkItem<>( - KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder, false)); + KEY, + workItem.build(), + windowCoder, + wildcardWindowsCoder, + valueCoder, + WindmillTagEncodingV1.instance(), + false)); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java index 52c9844add86..bdeefcebb2ac 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.InputMessageBundle; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CollectionCoder; @@ -132,7 +133,13 @@ private WindowedValue> createValue( return new ValueInEmptyWindows<>( (KeyedWorkItem) new WindmillKeyedWorkItem<>( - KEY, workItem.build(), windowCoder, wildcardWindowsCoder, valueCoder, false)); + KEY, + workItem.build(), + windowCoder, + wildcardWindowsCoder, + valueCoder, + WindmillTagEncodingV1.instance(), + false)); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index a722b454e38e..2227c25ef15d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -30,7 +30,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.FakeKeyedWorkItemCoder; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; -import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -78,9 +79,12 @@ public class WindmillKeyedWorkItemTest { private static final StateNamespace STATE_NAMESPACE_2 = StateNamespaces.window(WINDOW_CODER, WINDOW_2); + public WindmillTagEncoding windmillTagEncoding; + @Before public void setUp() { MockitoAnnotations.initMocks(this); + windmillTagEncoding = WindmillTagEncodingV1.instance(); } @Test @@ -97,7 +101,13 @@ public void testElementIteration() throws Exception { KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( - KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, false); + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + false); assertThat( keyedWorkItem.elementsIterable(), @@ -170,7 +180,8 @@ public void testTimerOrdering() throws Exception { .build(); KeyedWorkItem keyedWorkItem = - new WindmillKeyedWorkItem<>(KEY, workItem, WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, false); + new WindmillKeyedWorkItem<>( + KEY, workItem, WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, windmillTagEncoding, false); assertThat( keyedWorkItem.timersIterable(), @@ -181,18 +192,17 @@ public void testTimerOrdering() throws Exception { makeTimer(STATE_NAMESPACE_1, 2, TimeDomain.PROCESSING_TIME))); } - private static Windmill.Timer makeSerializedTimer( + private Windmill.Timer makeSerializedTimer( StateNamespace ns, long timestamp, Windmill.Timer.Type type) { return Windmill.Timer.newBuilder() .setTag( - WindmillStateTagUtil.instance() - .timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - TimerData.of( - ns, - new Instant(timestamp), - new Instant(timestamp), - timerTypeToTimeDomain(type)))) + windmillTagEncoding.timerTag( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + TimerData.of( + ns, + new Instant(timestamp), + new Instant(timestamp), + timerTypeToTimeDomain(type)))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) .setStateFamily(STATE_FAMILY) @@ -244,7 +254,13 @@ public void testDrainPropagated() throws Exception { .build()); KeyedWorkItem keyedWorkItem = new WindmillKeyedWorkItem<>( - KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, true); + KEY, + workItem.build(), + WINDOW_CODER, + WINDOWS_CODER, + VALUE_CODER, + windmillTagEncoding, + true); Iterator> iterator = keyedWorkItem.elementsIterable().iterator(); Assert.assertTrue(iterator.next().causedByDrain()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index 40b292298959..bbb8e4c93c07 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -60,6 +60,8 @@ public class WindmillStateCacheTest { private static final long MEGABYTES = 1024 * 1024; DataflowWorkerHarnessOptions options; + WindmillTagEncoding windmillTagEncoding; + private static class TestStateTag implements StateTag { final String id; @@ -150,21 +152,20 @@ private static WindmillComputationKey computationKey( return WindmillComputationKey.create(computationId, ByteString.copyFromUtf8(key), shardingKey); } - private static Optional getFromCache( + private Optional getFromCache( WindmillStateCache.ForKeyAndFamily keyCache, StateNamespace namespace, StateTag address) { return (Optional) Optional.ofNullable( - keyCache.get(namespace, WindmillStateTagUtil.instance().encodeKey(namespace, address))); + keyCache.get(namespace, windmillTagEncoding.stateTag(namespace, address))); } - private static void putInCache( + private void putInCache( WindmillStateCache.ForKeyAndFamily keyCache, StateNamespace namespace, StateTag tag, T value, long weight) { - keyCache.put( - namespace, WindmillStateTagUtil.instance().encodeKey(namespace, tag), value, weight); + keyCache.put(namespace, windmillTagEncoding.stateTag(namespace, tag), value, weight); } WindmillStateCache cache; @@ -172,6 +173,7 @@ private static void putInCache( @Before public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); + windmillTagEncoding = WindmillTagEncodingV1.instance(); cache = WindmillStateCache.builder().setSizeMb(400).build(); assertEquals(0, cache.getWeight()); } @@ -188,14 +190,14 @@ public void conflictingUserAndSystemTags() { WindmillValue userValue = new WindmillValue<>( StateNamespaces.global(), - WindmillStateTagUtil.instance().encodeKey(StateNamespaces.global(), userTag), + windmillTagEncoding.stateTag(StateNamespaces.global(), userTag), STATE_FAMILY, StringUtf8Coder.of(), false); WindmillValue systemValue = new WindmillValue<>( StateNamespaces.global(), - WindmillStateTagUtil.instance().encodeKey(StateNamespaces.global(), systemTag), + windmillTagEncoding.stateTag(StateNamespaces.global(), systemTag), STATE_FAMILY, StringUtf8Coder.of(), false); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 1a31e7b8d685..7a06d3a29493 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -128,6 +128,7 @@ public class WindmillStateInternalsTest { private WindmillStateInternals underTest; private WindmillStateInternals underTestNewKey; private WindmillStateInternals underTestMapViaMultimap; + private WindmillTagEncoding windmillTagEncoding; private WindmillStateCache cache; private WindmillStateCache cacheViaMultimap; @Mock private Supplier readStateSupplier; @@ -216,6 +217,7 @@ public void setUp() { public void resetUnderTest() { workToken++; + windmillTagEncoding = WindmillTagEncodingV1.instance(); underTest = new WindmillStateInternals<>( "dummyKey", @@ -230,7 +232,7 @@ public void resetUnderTest() { 17L, workToken) .forFamily(STATE_FAMILY), - WindmillStateTagUtil.instance(), + windmillTagEncoding, readStateSupplier); underTestNewKey = new WindmillStateInternals( @@ -246,7 +248,7 @@ public void resetUnderTest() { 17L, workToken) .forFamily(STATE_FAMILY), - WindmillStateTagUtil.instance(), + windmillTagEncoding, readStateSupplier); underTestMapViaMultimap = new WindmillStateInternals( @@ -262,7 +264,7 @@ public void resetUnderTest() { 17L, workToken) .forFamily(STATE_FAMILY), - WindmillStateTagUtil.instance(), + windmillTagEncoding, readStateSupplier); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java similarity index 93% rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java rename to runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index eb4713695dc6..73acdf937811 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -50,7 +50,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class WindmillStateTagUtilTest { +public class WindmillTagEncodingV1Test { private static final List, StateNamespace>> TEST_NAMESPACES_WITH_CODERS = ImmutableList.of( @@ -80,15 +80,15 @@ public class WindmillStateTagUtilTest { ImmutableList.of("", "foo", "this one has spaces", "this/one/has/slashes", "/"); @Test - public void testEncodeKey() { + public void testStateTag() { StateNamespaceForTest namespace = new StateNamespaceForTest("key"); StateTag> foo = StateTags.set("foo", VarIntCoder.of()); - InternedByteString bytes = WindmillStateTagUtil.instance().encodeKey(namespace, foo); + InternedByteString bytes = WindmillTagEncodingV1.instance().stateTag(namespace, foo); assertEquals("key+ufoo", bytes.byteString().toStringUtf8()); } @Test - public void testEncodeKeyNested() { + public void testStateTagNested() { // Hypothetical case where a namespace/tag encoding depends on a call to encodeKey // This tests if thread locals in WindmillStateUtil are not reused with nesting StateNamespaceForTest namespace1 = new StateNamespaceForTest("key"); @@ -97,7 +97,7 @@ public void testEncodeKeyNested() { new StateTag>() { @Override public void appendTo(Appendable sb) throws IOException { - WindmillStateTagUtil.instance().encodeKey(namespace1, tag1); + WindmillTagEncodingV1.instance().stateTag(namespace1, tag1); sb.append("tag2"); } @@ -121,11 +121,11 @@ public SetState bind(StateBinder binder) { new StateNamespaceForTest("key") { @Override public void appendTo(Appendable sb) throws IOException { - WindmillStateTagUtil.instance().encodeKey(namespace1, tag1); + WindmillTagEncodingV1.instance().stateTag(namespace1, tag1); sb.append("namespace2"); } }; - InternedByteString bytes = WindmillStateTagUtil.instance().encodeKey(namespace2, tag2); + InternedByteString bytes = WindmillTagEncodingV1.instance().stateTag(namespace2, tag2); assertEquals("namespace2+tag2", bytes.byteString().toStringUtf8()); } @@ -152,10 +152,10 @@ public void testTimerDataToFromTimer() { ? BoundedWindow.TIMESTAMP_MIN_VALUE : timer.getOutputTimestamp(); TimerData computed = - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .windmillTimerToTimerData( prefix, - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( stateFamily, prefix, timer, Timer.newBuilder()) .build(), @@ -205,10 +205,10 @@ public void testTimerDataToFromTimer() { expectedTimestamp, timer.getDomain()); assertThat( - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .windmillTimerToTimerData( prefix, - WindmillStateTagUtil.instance() + WindmillTagEncodingV1.instance() .buildWindmillTimerFromTimerData( stateFamily, prefix, timer, Timer.newBuilder()) .build(),