Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
*/
private final Map<TupleTag<?>, Map<BoundedWindow, SideInput<?>>> sideInputCache;

private final WindmillTagEncoding windmillTagEncoding;
/**
* The current user-facing key for this execution context.
*
Expand Down Expand Up @@ -169,6 +171,7 @@ public StreamingModeExecutionContext(
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.windmillTagEncoding = WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
Expand Down Expand Up @@ -200,6 +203,10 @@ public boolean getDrainMode() {
return work != null ? work.getDrainMode() : false;
}

public WindmillTagEncoding getWindmillTagEncoding() {
return windmillTagEncoding;
}

public boolean offsetBasedDeduplicationSupported() {
return activeReader != null
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
Expand Down Expand Up @@ -777,7 +784,7 @@ public void start(
stateReader,
getWorkItem().getIsNewKey(),
cacheForKey.forFamily(stateFamily),
WindmillStateTagUtil.instance(),
windmillTagEncoding,
scopedReadStateSupplier);

this.systemTimerInternals =
Expand All @@ -786,7 +793,7 @@ public void start(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
processingTime,
watermarks,
WindmillStateTagUtil.instance(),
windmillTagEncoding,
td -> {});

this.userTimerInternals =
Expand All @@ -795,7 +802,7 @@ public void start(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
processingTime,
watermarks,
WindmillStateTagUtil.instance(),
windmillTagEncoding,
this::onUserTimerModified);

this.cachedFiredSystemTimers = null;
Expand Down Expand Up @@ -823,12 +830,11 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator();
}

Expand Down Expand Up @@ -887,12 +893,11 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
&& timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer,
windowCoder,
getDrainMode()))
.iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
Expand Down Expand Up @@ -68,19 +68,22 @@ public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
private final transient Coder<? extends BoundedWindow> windowCoder;
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
private final transient Coder<ElemT> valueCoder;
private final WindmillTagEncoding windmillTagEncoding;

public WindmillKeyedWorkItem(
K key,
Windmill.WorkItem workItem,
Coder<? extends BoundedWindow> windowCoder,
Coder<Collection<? extends BoundedWindow>> windowsCoder,
Coder<ElemT> valueCoder,
WindmillTagEncoding windmillTagEncoding,
boolean drainMode) {
this.key = key;
this.workItem = workItem;
this.windowCoder = windowCoder;
this.windowsCoder = windowsCoder;
this.valueCoder = valueCoder;
this.windmillTagEncoding = windmillTagEncoding;
this.drainMode = drainMode;
}

Expand All @@ -98,12 +101,11 @@ public Iterable<TimerData> timersIterable() {
.append(nonEventTimers)
.transform(
timer ->
WindmillStateTagUtil.instance()
.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
windmillTagEncoding.windmillTimerToTimerData(
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer,
windowCoder,
drainMode));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateTagUtil;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -61,26 +61,26 @@ class WindmillTimerInternals implements TimerInternals {
private final String stateFamily;
private final WindmillNamespacePrefix prefix;
private final Consumer<TimerData> onTimerModified;
private final WindmillStateTagUtil windmillStateTagUtil;
private final WindmillTagEncoding windmillTagEncoding;

public WindmillTimerInternals(
String stateFamily, // unique identifies a step
WindmillNamespacePrefix prefix, // partitions user and system namespaces into "/u" and "/s"
Instant processingTime,
Watermarks watermarks,
WindmillStateTagUtil windmillStateTagUtil,
WindmillTagEncoding windmillTagEncoding,
Consumer<TimerData> onTimerModified) {
this.watermarks = watermarks;
this.processingTime = checkNotNull(processingTime);
this.stateFamily = stateFamily;
this.prefix = prefix;
this.windmillStateTagUtil = windmillStateTagUtil;
this.windmillTagEncoding = windmillTagEncoding;
this.onTimerModified = onTimerModified;
}

public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
return new WindmillTimerInternals(
stateFamily, prefix, processingTime, watermarks, windmillStateTagUtil, onTimerModified);
stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, onTimerModified);
}

@Override
Expand Down Expand Up @@ -187,7 +187,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
TimerData timerData = value.getKey();

Timer.Builder timer =
windmillStateTagUtil.buildWindmillTimerFromTimerData(
windmillTagEncoding.buildWindmillTimerFromTimerData(
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());

if (value.getValue()) {
Expand All @@ -201,7 +201,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
Expand All @@ -210,7 +210,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// Clear the hold in case a previous iteration of this timer set one.
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand All @@ -225,7 +225,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
.setTag(windmillStateTagUtil.timerHoldTag(prefix, timerData))
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
.setStateFamily(stateFamily)
.setReset(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
new WindmillKeyedWorkItem<>(
key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode());
key,
workItem,
windowCoder,
windowsCoder,
valueCoder,
context.getWindmillTagEncoding(),
context.getDrainMode());
final boolean isEmptyWorkItem =
(Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ final class CachingStateTable {
private final @Nullable CachingStateTable derivedStateTable;
private final boolean isNewKey;
private final boolean mapStateViaMultimapState;
private final WindmillStateTagUtil windmillStateTagUtil;
private final WindmillTagEncoding windmillTagEncoding;

private CachingStateTable(Builder builder) {
this.stateTable = new HashMap<>();
Expand All @@ -60,7 +60,7 @@ private CachingStateTable(Builder builder) {
this.scopedReadStateSupplier = builder.scopedReadStateSupplier;
this.derivedStateTable = builder.derivedStateTable;
this.mapStateViaMultimapState = builder.mapStateViaMultimapState;
this.windmillStateTagUtil = builder.windmillStateTagUtil;
this.windmillTagEncoding = builder.windmillTagEncoding;
if (this.isSystemTable) {
Preconditions.checkState(derivedStateTable == null);
} else {
Expand All @@ -74,9 +74,9 @@ static Builder builder(
ForKeyAndFamily cache,
boolean isNewKey,
Supplier<Closeable> scopedReadStateSupplier,
WindmillStateTagUtil windmillStateTagUtil) {
WindmillTagEncoding windmillTagEncoding) {
return new Builder(
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillStateTagUtil);
stateFamily, reader, cache, scopedReadStateSupplier, isNewKey, windmillTagEncoding);
}

/**
Expand Down Expand Up @@ -114,7 +114,7 @@ private StateTag.StateBinder binderForNamespace(StateNamespace namespace, StateC
public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
StateTag<BagState<T>> resolvedAddress =
isSystemTable ? StateTags.makeSystemTagInternal(address) : address;
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, resolvedAddress);
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, resolvedAddress);

@Nullable WindmillBag<T> bag = (WindmillBag<T>) cache.get(namespace, encodedKey);
if (bag == null) {
Expand Down Expand Up @@ -144,7 +144,7 @@ public <KeyT, ValueT> AbstractWindmillMap<KeyT, ValueT> bindMap(
new WindmillMapViaMultimap<>(
bindMultimap(internalMultimapAddress, keyCoder, valueCoder));
} else {
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec);
result = (AbstractWindmillMap<KeyT, ValueT>) cache.get(namespace, encodedKey);
if (result == null) {
result =
Expand All @@ -161,7 +161,7 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
StateTag<MultimapState<KeyT, ValueT>> spec,
Coder<KeyT> keyCoder,
Coder<ValueT> valueCoder) {
InternedByteString encodedKey = windmillStateTagUtil.encodeKey(namespace, spec);
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, spec);
WindmillMultimap<KeyT, ValueT> result =
(WindmillMultimap<KeyT, ValueT>) cache.get(namespace, encodedKey);
if (result == null) {
Expand All @@ -177,8 +177,7 @@ public <KeyT, ValueT> WindmillMultimap<KeyT, ValueT> bindMultimap(
public <T> OrderedListState<T> bindOrderedList(
StateTag<OrderedListState<T>> spec, Coder<T> elemCoder) {
StateTag<OrderedListState<T>> specOrInternalTag = addressOrInternalTag(spec);
InternedByteString encodedKey =
windmillStateTagUtil.encodeKey(namespace, specOrInternalTag);
InternedByteString encodedKey = windmillTagEncoding.stateTag(namespace, specOrInternalTag);

WindmillOrderedList<T> result = (WindmillOrderedList<T>) cache.get(namespace, encodedKey);
if (result == null) {
Expand All @@ -202,7 +201,7 @@ public WatermarkHoldState bindWatermark(
StateTag<WatermarkHoldState> address, TimestampCombiner timestampCombiner) {
StateTag<WatermarkHoldState> addressOrInternalTag = addressOrInternalTag(address);
InternedByteString encodedKey =
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);
windmillTagEncoding.stateTag(namespace, addressOrInternalTag);

WindmillWatermarkHold result = (WindmillWatermarkHold) cache.get(namespace, encodedKey);
if (result == null) {
Expand Down Expand Up @@ -231,7 +230,7 @@ public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCom
combineFn,
cache,
isNewKey,
windmillStateTagUtil);
windmillTagEncoding);

result.initializeForWorkItem(reader, scopedReadStateSupplier);
return result;
Expand All @@ -251,7 +250,7 @@ CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(
public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
StateTag<ValueState<T>> addressOrInternalTag = addressOrInternalTag(address);
InternedByteString encodedKey =
windmillStateTagUtil.encodeKey(namespace, addressOrInternalTag);
windmillTagEncoding.stateTag(namespace, addressOrInternalTag);

WindmillValue<T> result = (WindmillValue<T>) cache.get(namespace, encodedKey);
if (result == null) {
Expand Down Expand Up @@ -289,7 +288,7 @@ static class Builder {
private final WindmillStateCache.ForKeyAndFamily cache;
private final Supplier<Closeable> scopedReadStateSupplier;
private final boolean isNewKey;
private final WindmillStateTagUtil windmillStateTagUtil;
private final WindmillTagEncoding windmillTagEncoding;
private boolean isSystemTable;
private @Nullable CachingStateTable derivedStateTable;
private boolean mapStateViaMultimapState = false;
Expand All @@ -300,15 +299,15 @@ private Builder(
ForKeyAndFamily cache,
Supplier<Closeable> scopedReadStateSupplier,
boolean isNewKey,
WindmillStateTagUtil windmillStateTagUtil) {
WindmillTagEncoding windmillTagEncoding) {
this.stateFamily = stateFamily;
this.reader = reader;
this.cache = cache;
this.scopedReadStateSupplier = scopedReadStateSupplier;
this.isNewKey = isNewKey;
this.isSystemTable = true;
this.derivedStateTable = null;
this.windmillStateTagUtil = windmillStateTagUtil;
this.windmillTagEncoding = windmillTagEncoding;
}

Builder withDerivedState(CachingStateTable derivedStateTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class WindmillCombiningState<InputT, AccumT, OutputT> extends WindmillState
CombineFn<InputT, AccumT, OutputT> combineFn,
ForKeyAndFamily cache,
boolean isNewKey,
WindmillStateTagUtil windmillStateTagUtil) {
WindmillTagEncoding windmillTagEncoding) {
StateTag<BagState<AccumT>> internalBagAddress = StateTags.convertToBagTagInternal(address);
InternedByteString encodeKey = windmillStateTagUtil.encodeKey(namespace, internalBagAddress);
InternedByteString encodeKey = windmillTagEncoding.stateTag(namespace, internalBagAddress);

WindmillBag<AccumT> bag = (WindmillBag<AccumT>) cache.get(namespace, encodeKey);
if (bag == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ public WindmillStateInternals(
WindmillStateReader reader,
boolean isNewKey,
WindmillStateCache.ForKeyAndFamily cache,
WindmillStateTagUtil windmillStateTagUtil,
WindmillTagEncoding windmillTagEncoding,
Supplier<Closeable> scopedReadStateSupplier) {
this.key = key;
this.cache = cache;
this.scopedReadStateSupplier = scopedReadStateSupplier;
CachingStateTable.Builder builder =
CachingStateTable.builder(
stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillStateTagUtil);
stateFamily, reader, cache, isNewKey, scopedReadStateSupplier, windmillTagEncoding);
if (cache.supportMapStateViaMultimapState()) {
builder = builder.withMapStateViaMultimapState();
}
Expand Down
Loading
Loading