5050
5151import java .time .Duration ;
5252import java .util .ArrayList ;
53- import java .util .HashMap ;
5453import java .util .List ;
5554import java .util .Map ;
5655import java .util .Optional ;
5756import java .util .Set ;
5857import java .util .SortedMap ;
5958import java .util .TreeMap ;
59+ import java .util .concurrent .ConcurrentHashMap ;
6060import java .util .concurrent .Executors ;
6161import java .util .concurrent .ScheduledExecutorService ;
6262import java .util .concurrent .TimeUnit ;
@@ -87,7 +87,7 @@ public class FlinkConfigManager {
8787 private final AtomicLong defaultConfigVersion = new AtomicLong (0 );
8888 private final LoadingCache <Key , Configuration > cache ;
8989 private final Consumer <Set <String >> namespaceListener ;
90- private volatile Map <FlinkVersion , List <String >> relevantFlinkVersionPrefixes ;
90+ private volatile ConcurrentHashMap <FlinkVersion , List <String >> relevantFlinkVersionPrefixes ;
9191
9292 protected static final Pattern FLINK_VERSION_PATTERN =
9393 Pattern .compile (
@@ -114,7 +114,7 @@ public FlinkConfigManager(
114114 this .namespaceListener = namespaceListener ;
115115 Duration cacheTimeout =
116116 defaultConfig .get (KubernetesOperatorConfigOptions .OPERATOR_CONFIG_CACHE_TIMEOUT );
117- this .relevantFlinkVersionPrefixes = new HashMap <>();
117+ this .relevantFlinkVersionPrefixes = new ConcurrentHashMap <>();
118118 this .cache =
119119 CacheBuilder .newBuilder ()
120120 .maximumSize (
@@ -189,7 +189,7 @@ public void updateDefaultConfig(Configuration newConf) {
189189 // We clear the cached relevant Flink version prefixes as the base config may include new
190190 // version overrides.
191191 // This will trigger a regeneration of the prefixes in the next call to getDefaultConfig.
192- relevantFlinkVersionPrefixes = new HashMap <>();
192+ relevantFlinkVersionPrefixes = new ConcurrentHashMap <>();
193193 }
194194
195195 /**
0 commit comments