Skip to content

Commit f15ce02

Browse files
authored
[Dataflow Streaming] WindmillTimerInternals: Use a single map to store timer data + liveness (#34292)
1 parent 65887a0 commit f15ce02

File tree

1 file changed

+17
-15
lines changed

1 file changed

+17
-15
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
2222

2323
import java.io.IOException;
24+
import java.util.AbstractMap.SimpleEntry;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Map.Entry;
2428
import java.util.function.Consumer;
2529
import org.apache.beam.runners.core.StateNamespace;
2630
import org.apache.beam.runners.core.StateNamespaces;
@@ -35,9 +39,6 @@
3539
import org.apache.beam.sdk.util.VarInt;
3640
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
3741
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
38-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
39-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
40-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table.Cell;
4142
import org.checkerframework.checker.nullness.qual.Nullable;
4243
import org.joda.time.Duration;
4344
import org.joda.time.Instant;
@@ -64,10 +65,9 @@ class WindmillTimerInternals implements TimerInternals {
6465
// though technically in Windmill this is only enforced per ID and namespace
6566
// and TimeDomain. This TimerInternals is scoped to a step and key, shared
6667
// across namespaces.
67-
private final Table<String, StateNamespace, TimerData> timers = HashBasedTable.create();
68-
69-
// Map from timer id to whether it is to be deleted or set
70-
private final Table<String, StateNamespace, Boolean> timerStillPresent = HashBasedTable.create();
68+
private final Map<
69+
Entry<String /*ID*/, StateNamespace>, Entry<TimerData, Boolean /*timer set/unset*/>>
70+
timerMap = new HashMap<>();
7171

7272
private final Watermarks watermarks;
7373
private final Instant processingTime;
@@ -96,8 +96,9 @@ public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
9696
@Override
9797
public void setTimer(TimerData timerKey) {
9898
String timerDataKey = getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId());
99-
timers.put(timerDataKey, timerKey.getNamespace(), timerKey);
100-
timerStillPresent.put(timerDataKey, timerKey.getNamespace(), true);
99+
timerMap.put(
100+
new SimpleEntry<>(timerDataKey, timerKey.getNamespace()),
101+
new SimpleEntry<>(timerKey, true));
101102
onTimerModified.accept(timerKey);
102103
}
103104

@@ -126,8 +127,9 @@ private static String getTimerDataKey(String timerId, String timerFamilyId) {
126127
@Override
127128
public void deleteTimer(TimerData timerKey) {
128129
String timerDataKey = getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId());
129-
timers.put(timerDataKey, timerKey.getNamespace(), timerKey);
130-
timerStillPresent.put(timerDataKey, timerKey.getNamespace(), false);
130+
timerMap.put(
131+
new SimpleEntry<>(timerDataKey, timerKey.getNamespace()),
132+
new SimpleEntry<>(timerKey, false));
131133
onTimerModified.accept(timerKey.deleted());
132134
}
133135

@@ -189,16 +191,16 @@ public Instant currentInputWatermarkTime() {
189191
}
190192

191193
public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
192-
for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
194+
for (Entry<TimerData, Boolean> value : timerMap.values()) {
193195
// Regardless of whether it is set or not, it must have some TimerData stored so we
194196
// can know its time domain
195-
TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
197+
TimerData timerData = value.getKey();
196198

197199
Timer.Builder timer =
198200
buildWindmillTimerFromTimerData(
199201
stateFamily, prefix, timerData, outputBuilder.addOutputTimersBuilder());
200202

201-
if (cell.getValue()) {
203+
if (value.getValue()) {
202204
// Setting the timer. If it is a user timer, set a hold.
203205

204206
// Only set a hold if it's needed and if the hold is before the end of the global window.
@@ -241,7 +243,7 @@ public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
241243
}
242244

243245
// Wipe the unpersisted state
244-
timers.clear();
246+
timerMap.clear();
245247
}
246248

247249
private boolean needsWatermarkHold(TimerData timerData) {

0 commit comments

Comments
 (0)