From ec95b20d924367cf3b75f6f4289ae70f010853a1 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 5 Dec 2025 07:16:14 +0000 Subject: [PATCH] [Dataflow Streaming] Move timer tag logic to WindmillStateTagUtil --- .../worker/StreamingModeExecutionContext.java | 22 +- .../worker/WindmillKeyedWorkItem.java | 12 +- .../worker/WindmillTimerInternals.java | 219 +----------------- .../windmill/state/WindmillStateTagUtil.java | 215 +++++++++++++++++ .../StreamingGroupAlsoByWindowFnsTest.java | 20 +- .../worker/WindmillKeyedWorkItemTest.java | 29 ++- .../worker/WindmillTimerInternalsTest.java | 160 ------------- .../state/WindmillStateTagUtilTest.java | 139 +++++++++++ 8 files changed, 407 insertions(+), 409 deletions(-) delete mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index e3424e3d6670..c8ff7840bd1d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -823,11 +823,12 @@ public TimerData getNextFiredTimer(Coder windowCode && timer.getStateFamily().equals(stateFamily)) .transform( timer -> - WindmillTimerInternals.windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + WindmillStateTagUtil.instance() + .windmillTimerToTimerData( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + timer, + windowCoder, + getDrainMode())) .iterator(); } @@ -886,11 +887,12 @@ public TimerData getNextFiredUserTimer(Coder window && timer.getStateFamily().equals(stateFamily)) .transform( timer -> - WindmillTimerInternals.windmillTimerToTimerData( - WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, - timer, - windowCoder, - getDrainMode())) + WindmillStateTagUtil.instance() + .windmillTimerToTimerData( + WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, + timer, + windowCoder, + getDrainMode())) .iterator()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java index 415dab526bb5..ad5f2b0dd40a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -97,11 +98,12 @@ public Iterable timersIterable() { .append(nonEventTimers) .transform( timer -> - WindmillTimerInternals.windmillTimerToTimerData( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - timer, - windowCoder, - drainMode)); + WindmillStateTagUtil.instance() + .windmillTimerToTimerData( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + timer, + windowCoder, + drainMode)); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index 8dac9d11715e..e1d89fc10a17 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -17,29 +17,22 @@ */ package org.apache.beam.runners.dataflow.worker; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; -import java.io.IOException; import java.util.AbstractMap.SimpleEntry; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.function.Consumer; import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.VarInt; -import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -54,12 +47,6 @@ }) class WindmillTimerInternals implements TimerInternals { - private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = - GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); - - private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = - BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); - // Map from timer id to its TimerData. If it is to be deleted, we still need // its time domain here. Note that TimerData is unique per ID and namespace, // though technically in Windmill this is only enforced per ID and namespace @@ -200,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) { TimerData timerData = value.getKey(); Timer.Builder timer = - buildWindmillTimerFromTimerData( + windmillStateTagUtil.buildWindmillTimerFromTimerData( stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder()); if (value.getValue()) { @@ -262,208 +249,4 @@ public static boolean isSystemTimer(Windmill.Timer timer) { public static boolean isUserTimer(Windmill.Timer timer) { return timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString()); } - - /** - * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. - * - * @return the input builder for chaining - */ - static Timer.Builder buildWindmillTimerFromTimerData( - @Nullable String stateFamily, - WindmillNamespacePrefix prefix, - TimerData timerData, - Timer.Builder builder) { - - builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); - - if (stateFamily != null) { - builder.setStateFamily(stateFamily); - } - - builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); - - // Store the output timestamp in the metadata timestamp. - Instant outputTimestamp = timerData.getOutputTimestamp(); - if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { - // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of - // the global window - // here instead. - outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; - } - builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); - return builder; - } - - static Timer timerDataToWindmillTimer( - @Nullable String stateFamily, WindmillNamespacePrefix prefix, TimerData timerData) { - return buildWindmillTimerFromTimerData(stateFamily, prefix, timerData, Timer.newBuilder()) - .build(); - } - - public static TimerData windmillTimerToTimerData( - WindmillNamespacePrefix prefix, - Timer timer, - Coder windowCoder, - boolean draining) { - - // The tag is a path-structure string but cheaper to parse than a proper URI. It follows - // this pattern, where no component but the ID can contain a slash - // - // prefix namespace '+' id '+' familyId - // - // prefix ::= '/' prefix_char - // namespace ::= '/' | '/' window '/' - // id ::= autogenerated_id | arbitrary_string - // autogenerated_id ::= timedomain_ordinal ':' millis - // - // Notes: - // - // - the slashes and whaatnot in prefix and namespace are owned by that bit of code - // - the prefix_char is always ASCII 'u' or 's' for "user" or "system" - // - the namespace is generally a base64 encoding of the window passed through its coder, but: - // - the GlobalWindow is currently encoded in zero bytes, so it becomes "//" - // - the Global StateNamespace is different, and becomes "/" - // - the id is totally arbitrary; currently unescaped though that could change - - ByteString tag = timer.getTag(); - checkArgument( - tag.startsWith(prefix.byteString()), - "Expected timer tag %s to start with prefix %s", - tag, - prefix.byteString()); - - Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); - - // Parse the namespace. - int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash - int namespaceEnd = namespaceStart; - while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') { - namespaceEnd++; - } - String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8(); - - // Parse the timer id. - int timerIdStart = namespaceEnd + 1; - int timerIdEnd = timerIdStart; - while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') { - timerIdEnd++; - } - String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8(); - - // Parse the timer family. - int timerFamilyStart = timerIdEnd + 1; - int timerFamilyEnd = timerFamilyStart; - while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') { - timerFamilyEnd++; - } - // For backwards compatibility, handle the case were the timer family isn't present. - String timerFamily = - (timerFamilyStart < tag.size()) - ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8() - : ""; - - // For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a - // terminator because the - // output timestamp is the last segment in the tag and the timestamp encoding itself may contain - // '+'. - int outputTimestampStart = timerFamilyEnd + 1; - int outputTimestampEnd = tag.size(); - - // For backwards compatibility, handle the case were the output timestamp isn't present. - Instant outputTimestamp = timestamp; - if ((outputTimestampStart < tag.size())) { - try { - outputTimestamp = - new Instant( - VarInt.decodeLong( - tag.substring(outputTimestampStart, outputTimestampEnd).newInput())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (timer.hasMetadataTimestamp()) { - // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure - // to change the upper - // bound. - outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp()); - if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) { - outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE; - } - } - - StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); - return TimerData.of( - timerId, - timerFamily, - namespace, - timestamp, - outputTimestamp, - timerTypeToTimeDomain(timer.getType())); - // todo add draining (https://github.com/apache/beam/issues/36884) - - } - - private static boolean useNewTimerTagEncoding(TimerData timerData) { - return !timerData.getTimerFamilyId().isEmpty(); - } - - /** - * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and - * timestamp. - * - *

This is necessary because Windmill will deduplicate based only on this tag. - */ - public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { - String tagString; - if (useNewTimerTagEncoding(timerData)) { - tagString = - prefix.byteString().toStringUtf8() - + // this never ends with a slash - timerData.getNamespace().stringKey() - + // this must begin and end with a slash - '+' - + timerData.getTimerId() - + // this is arbitrary; currently unescaped - '+' - + timerData.getTimerFamilyId(); - } else { - // Timers without timerFamily would have timerFamily would be an empty string - tagString = - prefix.byteString().toStringUtf8() - + // this never ends with a slash - timerData.getNamespace().stringKey() - + // this must begin and end with a slash - '+' - + timerData.getTimerId() // this is arbitrary; currently unescaped - ; - } - return ByteString.copyFromUtf8(tagString); - } - - @VisibleForTesting - static Timer.Type timerType(TimeDomain domain) { - switch (domain) { - case EVENT_TIME: - return Timer.Type.WATERMARK; - case PROCESSING_TIME: - return Timer.Type.REALTIME; - case SYNCHRONIZED_PROCESSING_TIME: - return Timer.Type.DEPENDENT_REALTIME; - default: - throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain); - } - } - - @VisibleForTesting - static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) { - switch (type) { - case REALTIME: - return TimeDomain.PROCESSING_TIME; - case DEPENDENT_REALTIME: - return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; - case WATERMARK: - return TimeDomain.EVENT_TIME; - default: - throw new IllegalArgumentException("Unsupported timer type " + type); - } - } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java index 12b4001d530f..6f6ea02938ed 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtil.java @@ -17,23 +17,42 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + import java.io.IOException; import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream; import org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.VarInt; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; @Internal @ThreadSafe public class WindmillStateTagUtil { + private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE = + GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)); + + private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE = + BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1)); private static final String TIMER_HOLD_PREFIX = "/h"; private static final WindmillStateTagUtil INSTANCE = new WindmillStateTagUtil(); @@ -99,6 +118,202 @@ public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData timerDa return ByteString.copyFromUtf8(tagString); } + /** + * Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and + * timestamp. + * + *

This is necessary because Windmill will deduplicate based only on this tag. + */ + public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) { + String tagString; + if (useNewTimerTagEncoding(timerData)) { + tagString = + prefix.byteString().toStringUtf8() + + // this never ends with a slash + timerData.getNamespace().stringKey() + + // this must begin and end with a slash + '+' + + timerData.getTimerId() + + // this is arbitrary; currently unescaped + '+' + + timerData.getTimerFamilyId(); + } else { + // Timers without timerFamily would have timerFamily would be an empty string + tagString = + prefix.byteString().toStringUtf8() + + // this never ends with a slash + timerData.getNamespace().stringKey() + + // this must begin and end with a slash + '+' + + timerData.getTimerId() // this is arbitrary; currently unescaped + ; + } + return ByteString.copyFromUtf8(tagString); + } + + public TimerData windmillTimerToTimerData( + WindmillNamespacePrefix prefix, + Timer timer, + Coder windowCoder, + boolean draining) { + + // The tag is a path-structure string but cheaper to parse than a proper URI. It follows + // this pattern, where no component but the ID can contain a slash + // + // prefix namespace '+' id '+' familyId + // + // prefix ::= '/' prefix_char + // namespace ::= '/' | '/' window '/' + // id ::= autogenerated_id | arbitrary_string + // autogenerated_id ::= timedomain_ordinal ':' millis + // + // Notes: + // + // - the slashes and whaatnot in prefix and namespace are owned by that bit of code + // - the prefix_char is always ASCII 'u' or 's' for "user" or "system" + // - the namespace is generally a base64 encoding of the window passed through its coder, but: + // - the GlobalWindow is currently encoded in zero bytes, so it becomes "//" + // - the Global StateNamespace is different, and becomes "/" + // - the id is totally arbitrary; currently unescaped though that could change + + ByteString tag = timer.getTag(); + checkArgument( + tag.startsWith(prefix.byteString()), + "Expected timer tag %s to start with prefix %s", + tag, + prefix.byteString()); + + Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp()); + + // Parse the namespace. + int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash + int namespaceEnd = namespaceStart; + while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') { + namespaceEnd++; + } + String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8(); + + // Parse the timer id. + int timerIdStart = namespaceEnd + 1; + int timerIdEnd = timerIdStart; + while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') { + timerIdEnd++; + } + String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8(); + + // Parse the timer family. + int timerFamilyStart = timerIdEnd + 1; + int timerFamilyEnd = timerFamilyStart; + while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') { + timerFamilyEnd++; + } + // For backwards compatibility, handle the case were the timer family isn't present. + String timerFamily = + (timerFamilyStart < tag.size()) + ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8() + : ""; + + // For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a + // terminator because the + // output timestamp is the last segment in the tag and the timestamp encoding itself may contain + // '+'. + int outputTimestampStart = timerFamilyEnd + 1; + int outputTimestampEnd = tag.size(); + + // For backwards compatibility, handle the case were the output timestamp isn't present. + Instant outputTimestamp = timestamp; + if ((outputTimestampStart < tag.size())) { + try { + outputTimestamp = + new Instant( + VarInt.decodeLong( + tag.substring(outputTimestampStart, outputTimestampEnd).newInput())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else if (timer.hasMetadataTimestamp()) { + // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure + // to change the upper + // bound. + outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp()); + if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) { + outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE; + } + } + + StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder); + return TimerData.of( + timerId, + timerFamily, + namespace, + timestamp, + outputTimestamp, + timerTypeToTimeDomain(timer.getType())); + // todo add draining (https://github.com/apache/beam/issues/36884) + + } + + /** + * Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}. + * + * @return the input builder for chaining + */ + public Timer.Builder buildWindmillTimerFromTimerData( + @Nullable String stateFamily, + WindmillNamespacePrefix prefix, + TimerData timerData, + Timer.Builder builder) { + + builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain())); + + if (stateFamily != null) { + builder.setStateFamily(stateFamily); + } + + builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp())); + + // Store the output timestamp in the metadata timestamp. + Instant outputTimestamp = timerData.getOutputTimestamp(); + if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) { + // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of + // the global window + // here instead. + outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE; + } + builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp)); + return builder; + } + + private static Timer.Type timerType(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return Timer.Type.WATERMARK; + case PROCESSING_TIME: + return Timer.Type.REALTIME; + case SYNCHRONIZED_PROCESSING_TIME: + return Timer.Type.DEPENDENT_REALTIME; + default: + throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain); + } + } + + private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) { + switch (type) { + case REALTIME: + return TimeDomain.PROCESSING_TIME; + case DEPENDENT_REALTIME: + return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + case WATERMARK: + return TimeDomain.EVENT_TIME; + default: + throw new IllegalArgumentException("Unsupported timer type " + type); + } + } + + private static boolean useNewTimerTagEncoding(TimerData timerData) { + return !timerData.getTimerFamilyId().isEmpty(); + } + /** @return the singleton WindmillStateTagUtil */ public static WindmillStateTagUtil instance() { return INSTANCE; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index f7852ec1767d..1ae9678eed60 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.InputMessageBundle; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -148,15 +149,16 @@ private void addTimer( .getTimersBuilder() .addTimersBuilder() .setTag( - WindmillTimerInternals.timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - TimerData.of( - namespace, - timestamp, - timestamp, - type == Windmill.Timer.Type.WATERMARK - ? TimeDomain.EVENT_TIME - : TimeDomain.PROCESSING_TIME))) + WindmillStateTagUtil.instance() + .timerTag( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + TimerData.of( + namespace, + timestamp, + timestamp, + type == Windmill.Timer.Type.WATERMARK + ? TimeDomain.EVENT_TIME + : TimeDomain.PROCESSING_TIME))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp)) .setType(type) .setStateFamily(STATE_FAMILY); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index bbdde4498605..a722b454e38e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem.FakeKeyedWorkItemCoder; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CollectionCoder; import org.apache.beam.sdk.coders.KvCoder; @@ -184,13 +185,14 @@ private static Windmill.Timer makeSerializedTimer( StateNamespace ns, long timestamp, Windmill.Timer.Type type) { return Windmill.Timer.newBuilder() .setTag( - WindmillTimerInternals.timerTag( - WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, - TimerData.of( - ns, - new Instant(timestamp), - new Instant(timestamp), - WindmillTimerInternals.timerTypeToTimeDomain(type)))) + WindmillStateTagUtil.instance() + .timerTag( + WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, + TimerData.of( + ns, + new Instant(timestamp), + new Instant(timestamp), + timerTypeToTimeDomain(type)))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) .setStateFamily(STATE_FAMILY) @@ -254,4 +256,17 @@ public void testDrainPropagated() throws Exception { keyedWorkItem.timersIterable(), Matchers.contains(makeTimer(STATE_NAMESPACE_2, 3, TimeDomain.EVENT_TIME))); } + + private static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) { + switch (type) { + case REALTIME: + return TimeDomain.PROCESSING_TIME; + case DEPENDENT_REALTIME: + return TimeDomain.SYNCHRONIZED_PROCESSING_TIME; + case WATERMARK: + return TimeDomain.EVENT_TIME; + default: + throw new IllegalArgumentException("Unsupported timer type " + type); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java deleted file mode 100644 index 4780cd768efb..000000000000 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; - -import java.util.List; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Unit tests for {@link WindmillTimerInternals}. */ -@RunWith(JUnit4.class) -public class WindmillTimerInternalsTest { - - private static final List, StateNamespace>> - TEST_NAMESPACES_WITH_CODERS = - ImmutableList.of( - KV.of(null, StateNamespaces.global()), - KV.of( - GlobalWindow.Coder.INSTANCE, - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE)), - KV.of( - IntervalWindow.getCoder(), - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(13), new Instant(47))))); - - private static final List TEST_TIMESTAMPS = - ImmutableList.of( - BoundedWindow.TIMESTAMP_MIN_VALUE, - BoundedWindow.TIMESTAMP_MAX_VALUE, - GlobalWindow.INSTANCE.maxTimestamp(), - new Instant(0), - new Instant(127), - // The encoding of Instant(716000) ends with '+'. - new Instant(716001)); - - private static final List TEST_STATE_FAMILIES = ImmutableList.of("", "F24"); - - private static final List TEST_TIMER_IDS = - ImmutableList.of("", "foo", "this one has spaces", "this/one/has/slashes", "/"); - - @Test - public void testTimerDataToFromTimer() { - for (String stateFamily : TEST_STATE_FAMILIES) { - for (KV, StateNamespace> coderAndNamespace : - TEST_NAMESPACES_WITH_CODERS) { - - @Nullable Coder coder = coderAndNamespace.getKey(); - StateNamespace namespace = coderAndNamespace.getValue(); - - for (TimeDomain timeDomain : TimeDomain.values()) { - for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) { - for (Instant timestamp : TEST_TIMESTAMPS) { - List anonymousTimers = - ImmutableList.of( - TimerData.of(namespace, timestamp, timestamp, timeDomain), - TimerData.of( - namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain)); - for (TimerData timer : anonymousTimers) { - Instant expectedTimestamp = - timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) - ? BoundedWindow.TIMESTAMP_MIN_VALUE - : timer.getOutputTimestamp(); - TimerData computed = - WindmillTimerInternals.windmillTimerToTimerData( - prefix, - WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer), - coder, - false); - // The function itself bounds output, so we dont expect the original input as the - // output, we expect it to be bounded - TimerData expected = - TimerData.of( - timer.getNamespace(), timestamp, expectedTimestamp, timer.getDomain()); - - assertThat(computed, equalTo(expected)); - } - - for (String timerId : TEST_TIMER_IDS) { - List timers = - ImmutableList.of( - TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain), - TimerData.of( - timerId, "family", namespace, timestamp, timestamp, timeDomain), - TimerData.of( - timerId, - namespace, - timestamp, - timestamp.minus(Duration.millis(1)), - timeDomain), - TimerData.of( - timerId, - "family", - namespace, - timestamp, - timestamp.minus(Duration.millis(1)), - timeDomain)); - - for (TimerData timer : timers) { - Instant expectedTimestamp = - timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) - ? BoundedWindow.TIMESTAMP_MIN_VALUE - : timer.getOutputTimestamp(); - - TimerData expected = - TimerData.of( - timer.getTimerId(), - timer.getTimerFamilyId(), - timer.getNamespace(), - timer.getTimestamp(), - expectedTimestamp, - timer.getDomain()); - assertThat( - WindmillTimerInternals.windmillTimerToTimerData( - prefix, - WindmillTimerInternals.timerDataToWindmillTimer( - stateFamily, prefix, timer), - coder, - false), - equalTo(expected)); - } - } - } - } - } - } - } - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java index 2c742883809e..eb4713695dc6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateTagUtilTest.java @@ -17,23 +17,67 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.List; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; +import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class WindmillStateTagUtilTest { + private static final List, StateNamespace>> + TEST_NAMESPACES_WITH_CODERS = + ImmutableList.of( + KV.of(null, StateNamespaces.global()), + KV.of( + GlobalWindow.Coder.INSTANCE, + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE)), + KV.of( + IntervalWindow.getCoder(), + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(new Instant(13), new Instant(47))))); + + private static final List TEST_TIMESTAMPS = + ImmutableList.of( + BoundedWindow.TIMESTAMP_MIN_VALUE, + BoundedWindow.TIMESTAMP_MAX_VALUE, + GlobalWindow.INSTANCE.maxTimestamp(), + new Instant(0), + new Instant(127), + // The encoding of Instant(716000) ends with '+'. + new Instant(716001)); + + private static final List TEST_STATE_FAMILIES = ImmutableList.of("", "F24"); + + private static final List TEST_TIMER_IDS = + ImmutableList.of("", "foo", "this one has spaces", "this/one/has/slashes", "/"); @Test public void testEncodeKey() { @@ -84,4 +128,99 @@ public void appendTo(Appendable sb) throws IOException { InternedByteString bytes = WindmillStateTagUtil.instance().encodeKey(namespace2, tag2); assertEquals("namespace2+tag2", bytes.byteString().toStringUtf8()); } + + @Test + public void testTimerDataToFromTimer() { + for (String stateFamily : TEST_STATE_FAMILIES) { + for (KV, StateNamespace> coderAndNamespace : + TEST_NAMESPACES_WITH_CODERS) { + + @Nullable Coder coder = coderAndNamespace.getKey(); + StateNamespace namespace = coderAndNamespace.getValue(); + + for (TimeDomain timeDomain : TimeDomain.values()) { + for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) { + for (Instant timestamp : TEST_TIMESTAMPS) { + List anonymousTimers = + ImmutableList.of( + TimerData.of(namespace, timestamp, timestamp, timeDomain), + TimerData.of( + namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain)); + for (TimerData timer : anonymousTimers) { + Instant expectedTimestamp = + timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) + ? BoundedWindow.TIMESTAMP_MIN_VALUE + : timer.getOutputTimestamp(); + TimerData computed = + WindmillStateTagUtil.instance() + .windmillTimerToTimerData( + prefix, + WindmillStateTagUtil.instance() + .buildWindmillTimerFromTimerData( + stateFamily, prefix, timer, Timer.newBuilder()) + .build(), + coder, + false); + // The function itself bounds output, so we dont expect the original input as the + // output, we expect it to be bounded + TimerData expected = + TimerData.of( + timer.getNamespace(), timestamp, expectedTimestamp, timer.getDomain()); + + assertThat(computed, equalTo(expected)); + } + + for (String timerId : TEST_TIMER_IDS) { + List timers = + ImmutableList.of( + TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain), + TimerData.of( + timerId, "family", namespace, timestamp, timestamp, timeDomain), + TimerData.of( + timerId, + namespace, + timestamp, + timestamp.minus(Duration.millis(1)), + timeDomain), + TimerData.of( + timerId, + "family", + namespace, + timestamp, + timestamp.minus(Duration.millis(1)), + timeDomain)); + + for (TimerData timer : timers) { + Instant expectedTimestamp = + timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) + ? BoundedWindow.TIMESTAMP_MIN_VALUE + : timer.getOutputTimestamp(); + + TimerData expected = + TimerData.of( + timer.getTimerId(), + timer.getTimerFamilyId(), + timer.getNamespace(), + timer.getTimestamp(), + expectedTimestamp, + timer.getDomain()); + assertThat( + WindmillStateTagUtil.instance() + .windmillTimerToTimerData( + prefix, + WindmillStateTagUtil.instance() + .buildWindmillTimerFromTimerData( + stateFamily, prefix, timer, Timer.newBuilder()) + .build(), + coder, + false), + equalTo(expected)); + } + } + } + } + } + } + } + } }