Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public W getWindow() {
return window;
}

public Coder<W> getWindowCoder() {
return windowCoder;
}

@Override
public String stringKey() {
try {
Expand Down Expand Up @@ -170,6 +174,10 @@ public W getWindow() {
return window;
}

public Coder<W> getWindowCoder() {
return windowCoder;
}

public int getTriggerIndex() {
return triggerIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ private StateTags() {}

private interface SystemStateTag<StateT extends State> {
StateTag<StateT> asKind(StateKind kind);

StateKind getKind();
}

/** Create a state tag for the given id and spec. */
Expand Down Expand Up @@ -243,6 +245,16 @@ public static <StateT extends State> StateTag<StateT> makeSystemTagInternal(
return typedTag.asKind(StateKind.SYSTEM);
}

/*
* Returns true if the tag is a system internal tag.
*/
public static <StateT extends State> boolean isSystemTagInternal(StateTag<StateT> tag) {
if (!(tag instanceof SystemStateTag)) {
return false;
}
return StateKind.SYSTEM.equals(((SystemStateTag<?>) tag).getKind());
}

public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>> convertToBagTagInternal(
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
Expand Down Expand Up @@ -358,6 +370,11 @@ public StateTag<StateT> asKind(StateKind kind) {
return new SimpleStateTag<>(id.asKind(kind), spec);
}

@Override
public StateKind getKind() {
return id.kind;
}

@Override
public boolean equals(@Nullable Object other) {
if (!(other instanceof SimpleStateTag)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -119,6 +122,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
*/
private final Map<TupleTag<?>, Map<BoundedWindow, SideInput<?>>> sideInputCache;

private final WindmillTagEncoding windmillTagEncoding;
/**
* The current user-facing key for this execution context.
*
Expand Down Expand Up @@ -152,13 +156,14 @@ public StreamingModeExecutionContext(
String computationId,
ReaderCache readerCache,
Map<String, String> stateNameMap,
WindmillStateCache.ForComputation stateCache,
ForComputation stateCache,
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
boolean throwExceptionOnLargeOutput,
boolean enableWindmillTagEncodingV2) {
super(
counterFactory,
metricsContainerRegistry,
Expand All @@ -169,6 +174,10 @@ public StreamingModeExecutionContext(
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.windmillTagEncoding =
enableWindmillTagEncodingV2
? WindmillTagEncodingV2.instance()
: WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
Expand Down Expand Up @@ -200,6 +209,10 @@ public boolean getDrainMode() {
return work != null ? work.getDrainMode() : false;
}

public WindmillTagEncoding getWindmillTagEncoding() {
return windmillTagEncoding;
}

public boolean offsetBasedDeduplicationSupported() {
return activeReader != null
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
Expand Down Expand Up @@ -777,7 +790,7 @@ public void start(
stateReader,
getWorkItem().getIsNewKey(),
cacheForKey.forFamily(stateFamily),
WindmillStateTagUtil.instance(),
windmillTagEncoding,
scopedReadStateSupplier);

this.systemTimerInternals =
Expand All @@ -786,7 +799,7 @@ public void start(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
processingTime,
watermarks,
WindmillStateTagUtil.instance(),
windmillTagEncoding,
td -> {});

this.userTimerInternals =
Expand All @@ -795,7 +808,7 @@ public void start(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
processingTime,
watermarks,
WindmillStateTagUtil.instance(),
windmillTagEncoding,
this::onUserTimerModified);

this.cachedFiredSystemTimers = null;
Expand Down Expand Up @@ -823,12 +836,11 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator();
}

Expand Down Expand Up @@ -887,12 +899,11 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
Expand Down Expand Up @@ -68,19 +68,22 @@ public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
private final transient Coder<? extends BoundedWindow> windowCoder;
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
private final transient Coder<ElemT> valueCoder;
private final WindmillTagEncoding windmillTagEncoding;

public WindmillKeyedWorkItem(
K key,
Windmill.WorkItem workItem,
Coder<? extends BoundedWindow> windowCoder,
Coder<Collection<? extends BoundedWindow>> windowsCoder,
Coder<ElemT> valueCoder,
WindmillTagEncoding windmillTagEncoding,
boolean drainMode) {
this.key = key;
this.workItem = workItem;
this.windowCoder = windowCoder;
this.windowsCoder = windowsCoder;
this.valueCoder = valueCoder;
this.windmillTagEncoding = windmillTagEncoding;
this.drainMode = drainMode;
}

Expand All @@ -98,12 +101,11 @@ public Iterable<TimerData> timersIterable() {
.append(nonEventTimers)
.transform(
timer ->
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -61,26 +61,26 @@ class WindmillTimerInternals implements TimerInternals {
private final String stateFamily;
private final WindmillNamespacePrefix prefix;
private final Consumer<TimerData> onTimerModified;
private final WindmillStateTagUtil windmillStateTagUtil;
private final WindmillTagEncoding windmillTagEncoding;

public WindmillTimerInternals(
String stateFamily, // unique identifies a step
WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s"
Instant processingTime,
Watermarks watermarks,
WindmillStateTagUtil windmillStateTagUtil,
WindmillTagEncoding windmillTagEncoding,
Consumer<TimerData> onTimerModified) {
this.watermarks = watermarks;
this.processingTime = checkNotNull(processingTime);
this.stateFamily = stateFamily;
this.prefix = prefix;
this.windmillStateTagUtil = windmillStateTagUtil;
this.windmillTagEncoding = windmillTagEncoding;
this.onTimerModified = onTimerModified;
}

public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
return new WindmillTimerInternals(
stateFamily, prefix, processingTime, watermarks, windmillStateTagUtil, onTimerModified);
stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified);
}

@Override
Expand Down Expand Up @@ -187,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
TimerData timerData = value.getKey();

Timer.Builder timer =
windmillStateTagUtil.buildWindmillTimerFromTimerData(
windmillTagEncoding.buildWindmillTimerFromTimerData(
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());

if (value.getValue()) {
Expand All @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
Expand All @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Clear the hold in case a previous iteration of this timer set one.
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand All @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
new WindmillKeyedWorkItem<>(
key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode());
key,
workItem,
windowCoder,
windowsCoder,
valueCoder,
context.getWindmillTagEncoding(),
context.getDrainMode());
final boolean isEmptyWorkItem =
(Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
Expand Down
Loading
Loading