Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -823,11 +823,12 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator();
}

Expand Down Expand Up @@ -886,11 +887,12 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
Expand Down Expand Up @@ -97,11 +98,12 @@ public Iterable<TimerData> timersIterable() {
.append(nonEventTimers)
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,22 @@
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Consumer;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand All @@ -54,12 +47,6 @@
})
class WindmillTimerInternals implements TimerInternals {

private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));

private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE =
BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));

// Map from timer id to its TimerData. If it is to be deleted, we still need
// its time domain here. Note that TimerData is unique per ID and namespace,
// though technically in Windmill this is only enforced per ID and namespace
Expand Down Expand Up @@ -200,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
TimerData timerData = value.getKey();

Timer.Builder timer =
buildWindmillTimerFromTimerData(
windmillStateTagUtil.buildWindmillTimerFromTimerData(
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());

if (value.getValue()) {
Expand Down Expand Up @@ -262,208 +249,4 @@ public static boolean isSystemTimer(Windmill.Timer timer) {
public static boolean isUserTimer(Windmill.Timer timer) {
return timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString());
}

/**
* Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}.
*
* @return the input builder for chaining
*/
static Timer.Builder buildWindmillTimerFromTimerData(
@Nullable String stateFamily,
WindmillNamespacePrefix prefix,
TimerData timerData,
Timer.Builder builder) {

builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain()));

if (stateFamily != null) {
builder.setStateFamily(stateFamily);
}

builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));

// Store the output timestamp in the metadata timestamp.
Instant outputTimestamp = timerData.getOutputTimestamp();
if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of
// the global window
// here instead.
outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE;
}
builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp));
return builder;
}

static Timer timerDataToWindmillTimer(
@Nullable String stateFamily, WindmillNamespacePrefix prefix, TimerData timerData) {
return buildWindmillTimerFromTimerData(stateFamily, prefix, timerData, Timer.newBuilder())
.build();
}

public static TimerData windmillTimerToTimerData(
WindmillNamespacePrefix prefix,
Timer timer,
Coder<? extends BoundedWindow> windowCoder,
boolean draining) {

// The tag is a path-structure string but cheaper to parse than a proper URI. It follows
// this pattern, where no component but the ID can contain a slash
//
// prefix namespace '+' id '+' familyId
//
// prefix ::= '/' prefix_char
// namespace ::= '/' | '/' window '/'
// id ::= autogenerated_id | arbitrary_string
// autogenerated_id ::= timedomain_ordinal ':' millis
//
// Notes:
//
// - the slashes and whaatnot in prefix and namespace are owned by that bit of code
// - the prefix_char is always ASCII 'u' or 's' for "user" or "system"
// - the namespace is generally a base64 encoding of the window passed through its coder, but:
// - the GlobalWindow is currently encoded in zero bytes, so it becomes "//"
// - the Global StateNamespace is different, and becomes "/"
// - the id is totally arbitrary; currently unescaped though that could change

ByteString tag = timer.getTag();
checkArgument(
tag.startsWith(prefix.byteString()),
"Expected timer tag %s to start with prefix %s",
tag,
prefix.byteString());

Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());

// Parse the namespace.
int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash
int namespaceEnd = namespaceStart;
while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') {
namespaceEnd++;
}
String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8();

// Parse the timer id.
int timerIdStart = namespaceEnd + 1;
int timerIdEnd = timerIdStart;
while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') {
timerIdEnd++;
}
String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8();

// Parse the timer family.
int timerFamilyStart = timerIdEnd + 1;
int timerFamilyEnd = timerFamilyStart;
while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') {
timerFamilyEnd++;
}
// For backwards compatibility, handle the case were the timer family isn't present.
String timerFamily =
(timerFamilyStart < tag.size())
? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
: "";

// For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a
// terminator because the
// output timestamp is the last segment in the tag and the timestamp encoding itself may contain
// '+'.
int outputTimestampStart = timerFamilyEnd + 1;
int outputTimestampEnd = tag.size();

// For backwards compatibility, handle the case were the output timestamp isn't present.
Instant outputTimestamp = timestamp;
if ((outputTimestampStart < tag.size())) {
try {
outputTimestamp =
new Instant(
VarInt.decodeLong(
tag.substring(outputTimestampStart, outputTimestampEnd).newInput()));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (timer.hasMetadataTimestamp()) {
// We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure
// to change the upper
// bound.
outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
}
}

StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
return TimerData.of(
timerId,
timerFamily,
namespace,
timestamp,
outputTimestamp,
timerTypeToTimeDomain(timer.getType()));
// todo add draining (https://github.com/apache/beam/issues/36884)

}

private static boolean useNewTimerTagEncoding(TimerData timerData) {
return !timerData.getTimerFamilyId().isEmpty();
}

/**
* Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and
* timestamp.
*
* <p>This is necessary because Windmill will deduplicate based only on this tag.
*/
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
String tagString;
if (useNewTimerTagEncoding(timerData)) {
tagString =
prefix.byteString().toStringUtf8()
+ // this never ends with a slash
timerData.getNamespace().stringKey()
+ // this must begin and end with a slash
'+'
+ timerData.getTimerId()
+ // this is arbitrary; currently unescaped
'+'
+ timerData.getTimerFamilyId();
} else {
// Timers without timerFamily would have timerFamily would be an empty string
tagString =
prefix.byteString().toStringUtf8()
+ // this never ends with a slash
timerData.getNamespace().stringKey()
+ // this must begin and end with a slash
'+'
+ timerData.getTimerId() // this is arbitrary; currently unescaped
;
}
return ByteString.copyFromUtf8(tagString);
}

@VisibleForTesting
static Timer.Type timerType(TimeDomain domain) {
switch (domain) {
case EVENT_TIME:
return Timer.Type.WATERMARK;
case PROCESSING_TIME:
return Timer.Type.REALTIME;
case SYNCHRONIZED_PROCESSING_TIME:
return Timer.Type.DEPENDENT_REALTIME;
default:
throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain);
}
}

@VisibleForTesting
static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) {
switch (type) {
case REALTIME:
return TimeDomain.PROCESSING_TIME;
case DEPENDENT_REALTIME:
return TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
case WATERMARK:
return TimeDomain.EVENT_TIME;
default:
throw new IllegalArgumentException("Unsupported timer type " + type);
}
}
}
Loading
Loading