Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Consumer;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
Expand All @@ -35,9 +39,6 @@
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table.Cell;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand All @@ -64,10 +65,8 @@ class WindmillTimerInternals implements TimerInternals {
// though technically in Windmill this is only enforced per ID and namespace
// and TimeDomain. This TimerInternals is scoped to a step and key, shared
// across namespaces.
private final Table<String, StateNamespace, TimerData> timers = HashBasedTable.create();

// Map from timer id to whether it is to be deleted or set
private final Table<String, StateNamespace, Boolean> timerStillPresent = HashBasedTable.create();
private final Map<Entry<String /*ID*/, StateNamespace>, Entry<TimerData, Boolean>> timerMap =
new LinkedHashMap<>();

private final Watermarks watermarks;
private final Instant processingTime;
Expand Down Expand Up @@ -96,8 +95,9 @@ public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
@Override
public void setTimer(TimerData timerKey) {
String timerDataKey = getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId());
timers.put(timerDataKey, timerKey.getNamespace(), timerKey);
timerStillPresent.put(timerDataKey, timerKey.getNamespace(), true);
timerMap.put(
new SimpleEntry<>(timerDataKey, timerKey.getNamespace()),
new SimpleEntry<>(timerKey, true));
onTimerModified.accept(timerKey);
}

Expand Down Expand Up @@ -126,8 +126,9 @@ private static String getTimerDataKey(String timerId, String timerFamilyId) {
@Override
public void deleteTimer(TimerData timerKey) {
String timerDataKey = getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId());
timers.put(timerDataKey, timerKey.getNamespace(), timerKey);
timerStillPresent.put(timerDataKey, timerKey.getNamespace(), false);
timerMap.put(
new SimpleEntry<>(timerDataKey, timerKey.getNamespace()),
new SimpleEntry<>(timerKey, false));
onTimerModified.accept(timerKey.deleted());
}

Expand Down Expand Up @@ -189,16 +190,16 @@ public Instant currentInputWatermarkTime() {
}

public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
for (Entry<TimerData, Boolean> value : timerMap.values()) {
// Regardless of whether it is set or not, it must have some TimerData stored so we
// can know its time domain
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
TimerData timerData = value.getKey();

Timer.Builder timer =
buildWindmillTimerFromTimerData(
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());

if (cell.getValue()) {
if (value.getValue()) {
// Setting the timer. If it is a user timer, set a hold.

// Only set a hold if it's needed and if the hold is before the end of the global window.
Expand Down Expand Up @@ -241,7 +242,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
}

// Wipe the unpersisted state
timers.clear();
timerMap.clear();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timerStillPresent was not cleared before, It is safe to clear both timerdata and boolen here since WindmillTimerInternals is recreated at the bundle level.

}

private boolean needsWatermarkHold(TimerData timerData) {
Expand Down
Loading