diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java index 02d67f5261ff..21b4f64f9247 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java @@ -32,9 +32,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -293,7 +293,7 @@ public void testTeardownCalledAfterExceptionInFinishBundleStateful() { @Before public void setup() { - ExceptionThrowingFn.callStateMap = new HashMap<>(); + ExceptionThrowingFn.callStateMap.clear(); ExceptionThrowingFn.exceptionWasThrown.set(false); } @@ -356,7 +356,7 @@ CallState finalState() { } private static class ExceptionThrowingFn extends DoFn { - static HashMap callStateMap = new HashMap<>(); + static Map callStateMap = new ConcurrentHashMap<>(); // exception is not necessarily thrown on every instance. But we expect at least // one during tests static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); @@ -373,7 +373,10 @@ private static void validate(CallState... requiredCallStates) { Map callStates; synchronized (ExceptionThrowingFn.class) { callStates = - (Map) ExceptionThrowingFn.callStateMap.clone(); + (Map) + Collections.synchronizedMap( + ExceptionThrowingFn.callStateMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))); } assertThat(callStates, is(not(anEmptyMap()))); // assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect