From 34810b4839e6b76607153301c66ec5cabb12c2c9 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Mon, 24 Nov 2025 18:03:17 +0100 Subject: [PATCH] callStateMap was accessed from multiple threads without synchronization. changing callStateMap to concurrent hashmap --- .../beam/sdk/transforms/ParDoLifecycleTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java index 02d67f5261ff..21b4f64f9247 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java @@ -32,9 +32,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -293,7 +293,7 @@ public void testTeardownCalledAfterExceptionInFinishBundleStateful() { @Before public void setup() { - ExceptionThrowingFn.callStateMap = new HashMap<>(); + ExceptionThrowingFn.callStateMap.clear(); ExceptionThrowingFn.exceptionWasThrown.set(false); } @@ -356,7 +356,7 @@ CallState finalState() { } private static class ExceptionThrowingFn extends DoFn { - static HashMap callStateMap = new HashMap<>(); + static Map callStateMap = new ConcurrentHashMap<>(); // exception is not necessarily thrown on every instance. But we expect at least // one during tests static AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); @@ -373,7 +373,10 @@ private static void validate(CallState... requiredCallStates) { Map callStates; synchronized (ExceptionThrowingFn.class) { callStates = - (Map) ExceptionThrowingFn.callStateMap.clone(); + (Map) + Collections.synchronizedMap( + ExceptionThrowingFn.callStateMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))); } assertThat(callStates, is(not(anEmptyMap()))); // assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect