Skip to content

Commit 738821c

Browse files
committed
[FLINK-36529] Cached the flink version default config prefixes
Signed-off-by: Thomas Cooper <[email protected]>
1 parent 2e05ee6 commit 738821c

File tree

2 files changed

+17
-4
lines changed

2 files changed

+17
-4
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
117117
| v1_18 | |
118118
| v1_19 | |
119119
| v1_20 | |
120+
| majorVersion | int | The major integer from the Flink semver. For example for Flink 1.18.1 this would be 1. |
121+
| minorVersion | int | The minor integer from the Flink semver. For example for Flink 1.18.1 this would be 18. |
120122

121123
### IngressSpec
122124
**Class**: org.apache.flink.kubernetes.operator.api.spec.IngressSpec

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import java.time.Duration;
5252
import java.util.ArrayList;
53+
import java.util.HashMap;
5354
import java.util.List;
5455
import java.util.Map;
5556
import java.util.Optional;
@@ -86,6 +87,7 @@ public class FlinkConfigManager {
8687
private final AtomicLong defaultConfigVersion = new AtomicLong(0);
8788
private final LoadingCache<Key, Configuration> cache;
8889
private final Consumer<Set<String>> namespaceListener;
90+
private volatile Map<FlinkVersion, List<String>> relevantFlinkVersionPrefixes;
8991

9092
protected static final Pattern FLINK_VERSION_PATTERN =
9193
Pattern.compile(
@@ -112,6 +114,7 @@ public FlinkConfigManager(
112114
this.namespaceListener = namespaceListener;
113115
Duration cacheTimeout =
114116
defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
117+
this.relevantFlinkVersionPrefixes = new HashMap<>();
115118
this.cache =
116119
CacheBuilder.newBuilder()
117120
.maximumSize(
@@ -182,6 +185,11 @@ public void updateDefaultConfig(Configuration newConf) {
182185
// We do not invalidate the cache to avoid deleting currently used temp files,
183186
// simply bump the version
184187
this.defaultConfigVersion.incrementAndGet();
188+
189+
// We clear the cached relevant Flink version prefixes as the base config may include new
190+
// version overrides.
191+
// This will trigger a regeneration of the prefixes in the next call to getDefaultConfig.
192+
relevantFlinkVersionPrefixes = new HashMap<>();
185193
}
186194

187195
/**
@@ -292,10 +300,13 @@ public Configuration getDefaultConfig(String namespace, FlinkVersion flinkVersio
292300
}
293301

294302
if (flinkVersion != null) {
295-
// Get a list of flink version configs that apply to this current flink version
296-
// That will include all versions that are equal to or lower than the current one
297-
// that are suffixed by a `+`
298-
List<String> versionPrefixes = getRelevantVersionPrefixes(baseConfMap, flinkVersion);
303+
// Fetch or create a list of Flink version configs that apply to this current
304+
// FlinkVersion. That will include all versions that are equal to or lower than
305+
// the current one that are suffixed by a `+`
306+
List<String> versionPrefixes =
307+
relevantFlinkVersionPrefixes.computeIfAbsent(
308+
flinkVersion,
309+
fv -> getRelevantVersionPrefixes(baseConfMap, flinkVersion));
299310

300311
// The version prefixes are returned in ascending order of Flink version, so configs
301312
// attached to newer versions will override older ones. For example v1_16+.conf1 will

0 commit comments

Comments
 (0)