-
Notifications
You must be signed in to change notification settings - Fork 498
[FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add batch e2e test #955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add batch e2e test #955
Conversation
09e6c01 to
ab21802
Compare
|
Thanks for the fix, I think we should include the e2e with this PR to make sure it's solid now :) |
ab21802 to
1d39e30
Compare
ee99f6f to
d184526
Compare
…ting not enabled for batch jobs and add batch e2e test
0c9d1b0 to
f34482d
Compare
I've updated the PR with the e2e. Let me know what you think. |
gyfora
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, added some minor comments , could you verify that without the fix the test fails?
| high-availability.type: kubernetes | ||
| high-availability.storageDir: file:///opt/flink/volume/flink-ha | ||
| state.checkpoints.dir: file:///opt/flink/volume/flink-cp | ||
| state.savepoints.dir: file:///opt/flink/volume/flink-sp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need HA/checkpoint configs? We could remove this and also simplify the pod template
e2e-tests/test_batch_job.sh
Outdated
| echo "Job reached FINISHED state. Sleeping for $SLEEP_DURATION seconds..." | ||
| sleep "$SLEEP_DURATION" | ||
|
|
||
| # Verify the job is *still* in the FINISHED state (second check). | ||
| wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of sleeping and checking again, would it make sense to maybe send in a spec change and make sure the operator re-triggers the batch job correctly? I think that would rule out the savepoint observe errors etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea behind the sleeping and checking again came from the fact that in #944, I fixed a bug where a bounded streaming job would reach finished and then would be set back to reconciling.
I do like the idea of adding a spec change and verifying the batch job reruns and finishes though.
| entryClass: org.apache.flink.streaming.examples.wordcount.WordCount | ||
| args: ["--execution-mode", "BATCH"] | ||
| parallelism: 2 | ||
| upgradeMode: stateless |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we set this to last-state?
I feel that with stateless we won't be able to verify that the savepoint observe logic is successful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. Although the operator logic still reaches getLastCheckpoint even in stateless mode. Does last-state even make sense for batch jobs given they don't support checkpointing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are right, we could leave this in stateless. To avoid sleeping and make sure that is reconciled and stays in finished we can simply add a no-op spec change (such as change some operator config that doesn't result in a new job depoyment) or changing of the upgrade mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gyfora Makes sense. Updated.
|
Thanks for the review @gyfora. I've made some updates based on your comments. I have also:
|
|
Full CI test matrix just finished running for our internal fork. The batch tests for Flink versions 1.18 and under failed with the below error. I think it would be ok to just the run the batch tests for versions 1.19 and up. Thoughts? |
Makes sense |

What is the purpose of the change
This pull request is a follow-up to #948. It corrects the exception that is caught from
org.apache.flink.runtime.rest.util.RestClientExceptiontojava.util.concurrent.ExecutionException.To make batch testing more robust, I have also introduced a batch e2e test with this PR.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors: noDocumentation