Skip to content

Conversation

@gyfora
Copy link
Contributor

@gyfora gyfora commented May 26, 2025

What is the purpose of the change

Initialize last triggered event timestamp correctly from kubernetes events + fix 2.0 compatibility

Verifying this change

Manually verified (Flink 1.18-2.0) + Unit tests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: no

lastExceptionTs =
EventUtils.findLastJobExceptionTsFromK8s(
ctx.getKubernetesClient(), resource)
.orElse(Instant.now().minus(MAX_K8S_EVENT_AGE));
Copy link
Contributor

@vsantwana vsantwana May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.orElse(Instant.now().minus(MAX_K8S_EVENT_AGE));
.orElse(k8sExpirationTs);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I cleaned up / simplified the duplicated code in the method in a new commit, please check :)

@vsantwana
Copy link
Contributor

Thanks @gyfora for the PR!
Left two very minor comments

@rmetzger
Copy link

I tested this PR on a dev env yesterday, and it all works (against Flink 1.19)

Copy link
Contributor

@vsantwana vsantwana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Thanks @gyfora

@gyfora gyfora merged commit 3c60c3c into apache:main May 27, 2025
130 checks passed
Comment on lines 157 to 160
if (maxJobExceptionTs.isBefore(k8sExpirationTs)) {
// If the last job exception was a long time ago, then there is no point in
// checking in k8s.
lastExceptionTs = maxJobExceptionTs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this optimization? It complicates the code by adding another setting. It also requires the user to tune just another setting. There is no harm in calling out to the k8s api regularly to fetch events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no config for this (nothing to tune) and the optimization can be very important when the operator starts up because then the cache is empty and it would fetch events for every single job. In most cases this filter completely eliminates that so this greatly reduces the startup api server load

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. The value is hardcoded. We would only query for the jobs with exceptions, but still those could amount to quite some jobs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants