Skip to content

Commit 7e5e3b8

Browse files
authored
[Dataflow Streaming] Move timer tag logic to WindmillStateTagUtil (#37006)
1 parent d7cd81a commit 7e5e3b8

File tree

8 files changed

+407
-409
lines changed

8 files changed

+407
-409
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -823,11 +823,12 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
823823
&& timer.getStateFamily().equals(stateFamily))
824824
.transform(
825825
timer ->
826-
WindmillTimerInternals.windmillTimerToTimerData(
827-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
828-
timer,
829-
windowCoder,
830-
getDrainMode()))
826+
WindmillStateTagUtil.instance()
827+
.windmillTimerToTimerData(
828+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
829+
timer,
830+
windowCoder,
831+
getDrainMode()))
831832
.iterator();
832833
}
833834

@@ -886,11 +887,12 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
886887
&& timer.getStateFamily().equals(stateFamily))
887888
.transform(
888889
timer ->
889-
WindmillTimerInternals.windmillTimerToTimerData(
890-
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
891-
timer,
892-
windowCoder,
893-
getDrainMode()))
890+
WindmillStateTagUtil.instance()
891+
.windmillTimerToTimerData(
892+
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
893+
timer,
894+
windowCoder,
895+
getDrainMode()))
894896
.iterator());
895897
}
896898

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.beam.runners.core.TimerInternals.TimerData;
3131
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
3232
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
33+
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
3334
import org.apache.beam.sdk.coders.Coder;
3435
import org.apache.beam.sdk.coders.KvCoder;
3536
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -97,11 +98,12 @@ public Iterable<TimerData> timersIterable() {
9798
.append(nonEventTimers)
9899
.transform(
99100
timer ->
100-
WindmillTimerInternals.windmillTimerToTimerData(
101-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
102-
timer,
103-
windowCoder,
104-
drainMode));
101+
WindmillStateTagUtil.instance()
102+
.windmillTimerToTimerData(
103+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
104+
timer,
105+
windowCoder,
106+
drainMode));
105107
}
106108

107109
@Override

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java

Lines changed: 1 addition & 218 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,22 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker;
1919

20-
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2120
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2221

23-
import java.io.IOException;
2422
import java.util.AbstractMap.SimpleEntry;
2523
import java.util.HashMap;
2624
import java.util.Map;
2725
import java.util.Map.Entry;
2826
import java.util.function.Consumer;
2927
import org.apache.beam.runners.core.StateNamespace;
30-
import org.apache.beam.runners.core.StateNamespaces;
3128
import org.apache.beam.runners.core.TimerInternals;
3229
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
3330
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
3431
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
3532
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
36-
import org.apache.beam.sdk.coders.Coder;
3733
import org.apache.beam.sdk.state.TimeDomain;
3834
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3935
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
40-
import org.apache.beam.sdk.util.VarInt;
41-
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
42-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
4336
import org.checkerframework.checker.nullness.qual.Nullable;
4437
import org.joda.time.Duration;
4538
import org.joda.time.Instant;
@@ -54,12 +47,6 @@
5447
})
5548
class WindmillTimerInternals implements TimerInternals {
5649

57-
private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
58-
GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));
59-
60-
private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE =
61-
BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
62-
6350
// Map from timer id to its TimerData. If it is to be deleted, we still need
6451
// its time domain here. Note that TimerData is unique per ID and namespace,
6552
// though technically in Windmill this is only enforced per ID and namespace
@@ -200,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
200187
TimerData timerData = value.getKey();
201188

202189
Timer.Builder timer =
203-
buildWindmillTimerFromTimerData(
190+
windmillStateTagUtil.buildWindmillTimerFromTimerData(
204191
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());
205192

206193
if (value.getValue()) {
@@ -262,208 +249,4 @@ public static boolean isSystemTimer(Windmill.Timer timer) {
262249
public static boolean isUserTimer(Windmill.Timer timer) {
263250
return timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString());
264251
}
265-
266-
/**
267-
* Uses the given {@link Timer} builder to build a windmill {@link Timer} from {@link TimerData}.
268-
*
269-
* @return the input builder for chaining
270-
*/
271-
static Timer.Builder buildWindmillTimerFromTimerData(
272-
@Nullable String stateFamily,
273-
WindmillNamespacePrefix prefix,
274-
TimerData timerData,
275-
Timer.Builder builder) {
276-
277-
builder.setTag(timerTag(prefix, timerData)).setType(timerType(timerData.getDomain()));
278-
279-
if (stateFamily != null) {
280-
builder.setStateFamily(stateFamily);
281-
}
282-
283-
builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
284-
285-
// Store the output timestamp in the metadata timestamp.
286-
Instant outputTimestamp = timerData.getOutputTimestamp();
287-
if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
288-
// We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of
289-
// the global window
290-
// here instead.
291-
outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE;
292-
}
293-
builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp));
294-
return builder;
295-
}
296-
297-
static Timer timerDataToWindmillTimer(
298-
@Nullable String stateFamily, WindmillNamespacePrefix prefix, TimerData timerData) {
299-
return buildWindmillTimerFromTimerData(stateFamily, prefix, timerData, Timer.newBuilder())
300-
.build();
301-
}
302-
303-
public static TimerData windmillTimerToTimerData(
304-
WindmillNamespacePrefix prefix,
305-
Timer timer,
306-
Coder<? extends BoundedWindow> windowCoder,
307-
boolean draining) {
308-
309-
// The tag is a path-structure string but cheaper to parse than a proper URI. It follows
310-
// this pattern, where no component but the ID can contain a slash
311-
//
312-
// prefix namespace '+' id '+' familyId
313-
//
314-
// prefix ::= '/' prefix_char
315-
// namespace ::= '/' | '/' window '/'
316-
// id ::= autogenerated_id | arbitrary_string
317-
// autogenerated_id ::= timedomain_ordinal ':' millis
318-
//
319-
// Notes:
320-
//
321-
// - the slashes and whaatnot in prefix and namespace are owned by that bit of code
322-
// - the prefix_char is always ASCII 'u' or 's' for "user" or "system"
323-
// - the namespace is generally a base64 encoding of the window passed through its coder, but:
324-
// - the GlobalWindow is currently encoded in zero bytes, so it becomes "//"
325-
// - the Global StateNamespace is different, and becomes "/"
326-
// - the id is totally arbitrary; currently unescaped though that could change
327-
328-
ByteString tag = timer.getTag();
329-
checkArgument(
330-
tag.startsWith(prefix.byteString()),
331-
"Expected timer tag %s to start with prefix %s",
332-
tag,
333-
prefix.byteString());
334-
335-
Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
336-
337-
// Parse the namespace.
338-
int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash
339-
int namespaceEnd = namespaceStart;
340-
while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') {
341-
namespaceEnd++;
342-
}
343-
String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8();
344-
345-
// Parse the timer id.
346-
int timerIdStart = namespaceEnd + 1;
347-
int timerIdEnd = timerIdStart;
348-
while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') {
349-
timerIdEnd++;
350-
}
351-
String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8();
352-
353-
// Parse the timer family.
354-
int timerFamilyStart = timerIdEnd + 1;
355-
int timerFamilyEnd = timerFamilyStart;
356-
while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') {
357-
timerFamilyEnd++;
358-
}
359-
// For backwards compatibility, handle the case were the timer family isn't present.
360-
String timerFamily =
361-
(timerFamilyStart < tag.size())
362-
? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
363-
: "";
364-
365-
// For backwards compatibility, parse the output timestamp from the tag. Not using '+' as a
366-
// terminator because the
367-
// output timestamp is the last segment in the tag and the timestamp encoding itself may contain
368-
// '+'.
369-
int outputTimestampStart = timerFamilyEnd + 1;
370-
int outputTimestampEnd = tag.size();
371-
372-
// For backwards compatibility, handle the case were the output timestamp isn't present.
373-
Instant outputTimestamp = timestamp;
374-
if ((outputTimestampStart < tag.size())) {
375-
try {
376-
outputTimestamp =
377-
new Instant(
378-
VarInt.decodeLong(
379-
tag.substring(outputTimestampStart, outputTimestampEnd).newInput()));
380-
} catch (IOException e) {
381-
throw new RuntimeException(e);
382-
}
383-
} else if (timer.hasMetadataTimestamp()) {
384-
// We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure
385-
// to change the upper
386-
// bound.
387-
outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
388-
if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
389-
outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
390-
}
391-
}
392-
393-
StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
394-
return TimerData.of(
395-
timerId,
396-
timerFamily,
397-
namespace,
398-
timestamp,
399-
outputTimestamp,
400-
timerTypeToTimeDomain(timer.getType()));
401-
// todo add draining (https://github.com/apache/beam/issues/36884)
402-
403-
}
404-
405-
private static boolean useNewTimerTagEncoding(TimerData timerData) {
406-
return !timerData.getTimerFamilyId().isEmpty();
407-
}
408-
409-
/**
410-
* Produce a tag that is guaranteed to be unique for the given prefix, namespace, domain and
411-
* timestamp.
412-
*
413-
* <p>This is necessary because Windmill will deduplicate based only on this tag.
414-
*/
415-
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
416-
String tagString;
417-
if (useNewTimerTagEncoding(timerData)) {
418-
tagString =
419-
prefix.byteString().toStringUtf8()
420-
+ // this never ends with a slash
421-
timerData.getNamespace().stringKey()
422-
+ // this must begin and end with a slash
423-
'+'
424-
+ timerData.getTimerId()
425-
+ // this is arbitrary; currently unescaped
426-
'+'
427-
+ timerData.getTimerFamilyId();
428-
} else {
429-
// Timers without timerFamily would have timerFamily would be an empty string
430-
tagString =
431-
prefix.byteString().toStringUtf8()
432-
+ // this never ends with a slash
433-
timerData.getNamespace().stringKey()
434-
+ // this must begin and end with a slash
435-
'+'
436-
+ timerData.getTimerId() // this is arbitrary; currently unescaped
437-
;
438-
}
439-
return ByteString.copyFromUtf8(tagString);
440-
}
441-
442-
@VisibleForTesting
443-
static Timer.Type timerType(TimeDomain domain) {
444-
switch (domain) {
445-
case EVENT_TIME:
446-
return Timer.Type.WATERMARK;
447-
case PROCESSING_TIME:
448-
return Timer.Type.REALTIME;
449-
case SYNCHRONIZED_PROCESSING_TIME:
450-
return Timer.Type.DEPENDENT_REALTIME;
451-
default:
452-
throw new IllegalArgumentException("Unrecgonized TimeDomain: " + domain);
453-
}
454-
}
455-
456-
@VisibleForTesting
457-
static TimeDomain timerTypeToTimeDomain(Windmill.Timer.Type type) {
458-
switch (type) {
459-
case REALTIME:
460-
return TimeDomain.PROCESSING_TIME;
461-
case DEPENDENT_REALTIME:
462-
return TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
463-
case WATERMARK:
464-
return TimeDomain.EVENT_TIME;
465-
default:
466-
throw new IllegalArgumentException("Unsupported timer type " + type);
467-
}
468-
}
469252
}

0 commit comments

Comments
 (0)