-
Notifications
You must be signed in to change notification settings - Fork 498
[FLINK-36529] Allow Flink version configs to be set to greater than given version #918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36529] Allow Flink version configs to be set to greater than given version #918
Conversation
|
I probably need to look at adding an end to end test for this but want to make sure I am on the right track before doing that. |
|
CC @gyfora |
|
Looks great @tomncooper thanks for picking this up, I think this functionality is fairly well unit testable so I would not bother with an e2e (that would just add to our build time) |
|
I will try to review this in detail tomorrow but scrolling through it, it look pretty good already |
ce3d191 to
28f9c67
Compare
| for (Map.Entry<String, String> entry : baseConfMap.entrySet()) { | ||
| Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey()); | ||
| if (versionMatcher.matches() && versionMatcher.group("gt") != null) { | ||
| try { | ||
| FlinkVersion keyFlinkVersion = | ||
| FlinkVersion.fromMajorMinor( | ||
| Integer.parseInt(versionMatcher.group("major")), | ||
| Integer.parseInt(versionMatcher.group("minor"))); | ||
| if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) { | ||
| greaterThanVersionPrefixes.put( | ||
| keyFlinkVersion, | ||
| VERSION_CONF_PREFIX | ||
| + keyFlinkVersion | ||
| + KubernetesOperatorConfigOptions | ||
| .FLINK_VERSION_GREATER_THAN_SUFFIX | ||
| + "."); | ||
| } | ||
| } catch (NumberFormatException numberFormatException) { | ||
| LOG.warn("Unable to parse version number in config key: {}", entry.getKey()); | ||
| } catch (IllegalArgumentException illegalArgumentException) { | ||
| LOG.warn("Unknown Flink version in config key: {}", entry.getKey()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't we generate the relevant version prefix list once for each supported Flink version? That way we wouldn't need to do this again and again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean, for every supported version create all possible prefixes and then loop through them for the given FlinkVersion? That would simplify things, but we would have wasted calls to applyDefault which scans the whole base config each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation scans baseConfig once and only creates prefixes for version that are there. Of course I could refactor applyDefault to accept a list of prefixes and check each config against them. Which may be less checks overall.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also do this once, when the base config changes in the config manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I added a cache (map) in the config manager (keyed on flink version) and call that cache inside getDefaultConfig. I clear the cache in updateDefaultConfig so we only do the calc once for each FlinkVersion:baseConfig combination.
…iven version Signed-off-by: Thomas Cooper <[email protected]>
28f9c67 to
2e05ee6
Compare
7261b9d to
3f62577
Compare
|
@gyfora Looks like the CI was cancelled rather than failed? |
|
There is an error in the CI |
Signed-off-by: Thomas Cooper <[email protected]>
3f62577 to
738821c
Compare
What is the purpose of the change
The operator currently allows the following syntax for defining flink version specific defaults:
kubernetes.operator.default-configuration.flink-version.v1_18.key: valueThe problem with this is that, in many cases, these defaults should be applied to newer Flink versions as well, forcing config duplications.
This PR introduces a new "greater than" syntax for config defaults, indicating that they should be applied to a given version and above:
kubernetes.operator.default-configuration.flink-version.v1_18+.key: valueIn this case key:value would be applied to all Flink version greater or equal to 1.18, unless overridden for specific versions.
Brief change log
getRelevantVersionPrefixes, to theorg.apache.flink.kubernetes.operator.config.FlinkConfigManager, which identifies all Flink version default config prefixes which are relevant to the currently specified Flink version. These are then saved in cache (map), keyed on Flink version.getDefaultConfigto call the cached flink version prefixes or generated them if none exist.updateDefaultConfigis called. This ensures new prefixes will be calculated on the next call togetDefaultConfig.FlinkVersionenum to specify major and minor semver integers to facilitate quick look up of relevant Flink versions when parsing version strings.Verifying this change
This change added tests and can be verified as follows:
FlinkConfigManagerTestclass to cover the new Regex andgetRelevantVersionPrefixesmethods.testVersionNamespaceDefaultConfstest inFlinkConfigManagerTestto test the greater than version behaviour.FlinkVersionTestto ensure refactoring of the enum was correct.Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: noDocumentation