diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java index 8e3d76f5fc8f..83d78ff836c7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapUserState.java @@ -47,8 +47,6 @@ import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; @@ -273,7 +271,12 @@ public PrefetchableIterable> entries() { keysStateRequest.getStateKey()); // Make a deep copy of pendingAdds so this iterator represents a snapshot of state at the time // it was created. - Map>> pendingAddsNow = ImmutableMap.copyOf(pendingAdds); + Map>> pendingAddsNow = new HashMap<>(); + for (Map.Entry>> entry : pendingAdds.entrySet()) { + pendingAddsNow.put( + entry.getKey(), + KV.of(entry.getValue().getKey(), new ArrayList<>(entry.getValue().getValue()))); + } if (isCleared) { return PrefetchableIterables.maybePrefetchable( Iterables.concat( @@ -285,7 +288,12 @@ public PrefetchableIterable> entries() { value -> Maps.immutableEntry(entry.getValue().getKey(), value))))); } - Set pendingRemovesNow = ImmutableSet.copyOf(pendingRemoves.keySet()); + // Make a deep copy of pendingRemoves so this iterator represents a snapshot of state at the + // time it was created. + Set pendingRemovesNow = new HashSet<>(); + for (Object key : pendingRemoves.keySet()) { + pendingRemovesNow.add(key); + } return new PrefetchableIterables.Default>() { @Override public PrefetchableIterator> createIterator() {