Skip to content

Conversation

@luca-p-castelli
Copy link
Contributor

@luca-p-castelli luca-p-castelli commented Feb 21, 2025

What is the purpose of the change

This pull request fixes a bug where in the observation phase, the operator tries to observe savepoint information for batch jobs and fails since checkpointing is not enabled for batch jobs.

More information in: https://issues.apache.org/jira/browse/FLINK-37370

Brief change log

  1. Modifies JobDetailsInfo and downstream tests to include job-type
  2. Adds a method to FlinkService + AbstractFlinkService to fetch JobDetailsInfo for the given job
  3. Adds a method to SnapshotObserver to check if a job is a batch job. This method is used to skip observing checkpoints/savepoints for batch jobs.

Some questions

  1. There is a local version of JobDetailsInfo in the operator code. Based on the existing comment, saying it can be removed when the client upgrades to 1.20, I tried to remove the local version, but ran into issues with the non-null requirement for slotSharingGroupId and jobType - particularly with the flink15 and flink16 compatibility tests. I've kept the local version and added job-type. Open to suggestions if you think there is a better way to handle this.
  2. When running a batch job there are still some exceptions being logged as warnings that originate in populateStateSize - also related to checkpointing not being enabled for batch jobs. I didn't change anything there since those warnings/exceptions don't crash anything. Open to suggestions if you think we shouldn't leave this as is.
  3. JobAutoScalerImplTest.testMetricReporting seemed flaky when running tests locally. Has this been observed before?

Verifying this change

  • All existing tests pass (modified a couple tests to include job-type)
  • Tested locally with the manifest attached to the issue to make sure the job successfully reaches FINISHED JOB STATUS

If you think my approach is good, as a next step, happy to add some tests for the new logic I've introduced. Where would you like to see tests?

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@luca-p-castelli luca-p-castelli marked this pull request as ready for review February 21, 2025 17:49
});
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, RestClientException.class)
.map(ex -> ex.getMessage().contains("Checkpointing has not been enabled"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is message always not null here?

Copy link
Contributor Author

@luca-p-castelli luca-p-castelli Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm not sure. I'll add logic to check for null.

Comment on lines 450 to 469
ctx.getFlinkService()
.getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
.ifPresentOrElse(
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
() -> {
if (ReconciliationUtils.isJobCancelled(status)) {
// For cancelled jobs the observed savepoint is always definite,
// so if empty we know the job doesn't have any
// checkpoints/savepoints
jobStatus.setUpgradeSavepointPath(null);
}
});
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e, RestClientException.class)
.map(ex -> ex.getMessage().contains("Checkpointing has not been enabled"))
.orElse(false)) {
LOG.warn(
"Checkpointing not enabled for job {}, skipping checkpoint observation",
jobId,
e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the try/catch logic should be part of getLastCheckpoint of the flink service. That would mean that anywhere else we call this in the future we get a consistently good behaviour for batch jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about that. I'm open to either. We would just have getLastCheckpoint catch the exception and return optional.empty()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would make sense 👍

@gyfora gyfora merged commit 9eb3c38 into apache:main Feb 25, 2025
118 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants