Skip to content

Commit 61b8f41

Browse files
authored
Changes multimap entries Iterable to make a deep copy of pending adds and deletes (#36759)
1 parent 49de281 commit 61b8f41

File tree

1 file changed

+12
-4
lines changed

1 file changed

+12
-4
lines changed

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@
4747
import org.apache.beam.sdk.util.ByteStringOutputStream;
4848
import org.apache.beam.sdk.values.KV;
4949
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
50-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
51-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
5250
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
5351
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
5452
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -273,7 +271,12 @@ public PrefetchableIterable<Map.Entry<K, V>> entries() {
273271
keysStateRequest.getStateKey());
274272
// Make a deep copy of pendingAdds so this iterator represents a snapshot of state at the time
275273
// it was created.
276-
Map<Object, KV<K, List<V>>> pendingAddsNow = ImmutableMap.copyOf(pendingAdds);
274+
Map<Object, KV<K, List<V>>> pendingAddsNow = new HashMap<>();
275+
for (Map.Entry<Object, KV<K, List<V>>> entry : pendingAdds.entrySet()) {
276+
pendingAddsNow.put(
277+
entry.getKey(),
278+
KV.of(entry.getValue().getKey(), new ArrayList<>(entry.getValue().getValue())));
279+
}
277280
if (isCleared) {
278281
return PrefetchableIterables.maybePrefetchable(
279282
Iterables.concat(
@@ -285,7 +288,12 @@ public PrefetchableIterable<Map.Entry<K, V>> entries() {
285288
value -> Maps.immutableEntry(entry.getValue().getKey(), value)))));
286289
}
287290

288-
Set<Object> pendingRemovesNow = ImmutableSet.copyOf(pendingRemoves.keySet());
291+
// Make a deep copy of pendingRemoves so this iterator represents a snapshot of state at the
292+
// time it was created.
293+
Set<Object> pendingRemovesNow = new HashSet<>();
294+
for (Object key : pendingRemoves.keySet()) {
295+
pendingRemovesNow.add(key);
296+
}
289297
return new PrefetchableIterables.Default<Map.Entry<K, V>>() {
290298
@Override
291299
public PrefetchableIterator<Map.Entry<K, V>> createIterator() {

0 commit comments

Comments
 (0)