Skip to content

Conversation

@luca-p-castelli
Copy link
Contributor

@luca-p-castelli luca-p-castelli commented Mar 12, 2025

What is the purpose of the change

This pull request is a follow-up to #948. It corrects the exception that is caught from org.apache.flink.runtime.rest.util.RestClientException to java.util.concurrent.ExecutionException.

To make batch testing more robust, I have also introduced a batch e2e test with this PR.

Brief change log

  • Updates the Exception and corresponding test
  • Introduces a batch e2e test for the operator that uses the bundled Flink WordCount example

Verifying this change

  • Existing tests
  • Newly added batch e2e test
  • Tested locally with Minikube and the batch manifest in the JIRA ticket

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@luca-p-castelli luca-p-castelli changed the title Fix checkpointing not enabled exception and add batch e2e test [FLINK-37370] [Observer] Fix Exception caught when gracefully handling checkpointing not enabled for batch jobs and add E2E batch test Mar 12, 2025
@luca-p-castelli luca-p-castelli changed the title [FLINK-37370] [Observer] Fix Exception caught when gracefully handling checkpointing not enabled for batch jobs and add E2E batch test [FLINK-37370] [Observer] Fix exception caught when gracefully handling checkpointing not enabled for batch jobs and add E2E batch test Mar 12, 2025
@luca-p-castelli luca-p-castelli changed the title [FLINK-37370] [Observer] Fix exception caught when gracefully handling checkpointing not enabled for batch jobs and add E2E batch test [FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add E2E batch test Mar 12, 2025
@luca-p-castelli luca-p-castelli changed the title [FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add E2E batch test [FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs Mar 13, 2025
@luca-p-castelli luca-p-castelli force-pushed the lc.checkpointing-not-enabled-exception-fix branch from 09e6c01 to ab21802 Compare March 13, 2025 11:26
@luca-p-castelli luca-p-castelli marked this pull request as ready for review March 13, 2025 11:40
@gyfora
Copy link
Contributor

gyfora commented Mar 13, 2025

Thanks for the fix, I think we should include the e2e with this PR to make sure it's solid now :)

@luca-p-castelli luca-p-castelli force-pushed the lc.checkpointing-not-enabled-exception-fix branch from ab21802 to 1d39e30 Compare March 13, 2025 19:09
@luca-p-castelli luca-p-castelli marked this pull request as draft March 13, 2025 19:11
@luca-p-castelli luca-p-castelli force-pushed the lc.checkpointing-not-enabled-exception-fix branch from ee99f6f to d184526 Compare March 14, 2025 00:48
@luca-p-castelli luca-p-castelli changed the title [FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs [FLINK-37370] [Observer] Fix exception caught when handling checkpointing not enabled for batch jobs and add batch e2e test Mar 14, 2025
…ting not enabled for batch jobs and add batch e2e test
@luca-p-castelli luca-p-castelli force-pushed the lc.checkpointing-not-enabled-exception-fix branch from 0c9d1b0 to f34482d Compare March 14, 2025 01:39
@luca-p-castelli luca-p-castelli marked this pull request as ready for review March 14, 2025 01:40
@luca-p-castelli
Copy link
Contributor Author

Thanks for the fix, I think we should include the e2e with this PR to make sure it's solid now :)

I've updated the PR with the e2e. Let me know what you think.

Copy link
Contributor

@gyfora gyfora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, added some minor comments , could you verify that without the fix the test fails?

Comment on lines 34 to 37
high-availability.type: kubernetes
high-availability.storageDir: file:///opt/flink/volume/flink-ha
state.checkpoints.dir: file:///opt/flink/volume/flink-cp
state.savepoints.dir: file:///opt/flink/volume/flink-sp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need HA/checkpoint configs? We could remove this and also simplify the pod template

Comment on lines 46 to 50
echo "Job reached FINISHED state. Sleeping for $SLEEP_DURATION seconds..."
sleep "$SLEEP_DURATION"

# Verify the job is *still* in the FINISHED state (second check).
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' FINISHED $TIMEOUT || exit 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of sleeping and checking again, would it make sense to maybe send in a spec change and make sure the operator re-triggers the batch job correctly? I think that would rule out the savepoint observe errors etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind the sleeping and checking again came from the fact that in #944, I fixed a bug where a bounded streaming job would reach finished and then would be set back to reconciling.

I do like the idea of adding a spec change and verifying the batch job reruns and finishes though.

entryClass: org.apache.flink.streaming.examples.wordcount.WordCount
args: ["--execution-mode", "BATCH"]
parallelism: 2
upgradeMode: stateless
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set this to last-state?
I feel that with stateless we won't be able to verify that the savepoint observe logic is successful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. Although the operator logic still reaches getLastCheckpoint even in stateless mode. Does last-state even make sense for batch jobs given they don't support checkpointing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, we could leave this in stateless. To avoid sleeping and make sure that is reconciled and stays in finished we can simply add a no-op spec change (such as change some operator config that doesn't result in a new job depoyment) or changing of the upgrade mode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora Makes sense. Updated.

@luca-p-castelli
Copy link
Contributor Author

luca-p-castelli commented Mar 14, 2025

Thanks for the review @gyfora. I've made some updates based on your comments.

I have also:

  1. Made sure the e2e test fails without the fix (i.e. fails with what's currently in main)
    image

  2. I had to change the flink-examples JAR (just for the batch test) to a newer one since --execution-mode wasn't available in "https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.14.4/flink-examples-streaming_2.12-1.14.4.jar"

@luca-p-castelli luca-p-castelli requested a review from gyfora March 14, 2025 13:12
@luca-p-castelli
Copy link
Contributor Author

luca-p-castelli commented Mar 14, 2025

Full CI test matrix just finished running for our internal fork.

The batch tests for Flink versions 1.18 and under failed with the below error. I think it would be ok to just the run the batch tests for versions 1.19 and up. Thoughts?

Error: m2025-03-14 13:09:31,565 o.a.f.k.o.o.JobStatusObserver  [ERROR][default/flink-example-wordcount-batch] Job 063b25ead9e781bfaad8a07e2b587f38 failed with error: java.lang.NoSuchMethodError: 'org.apache.flink.streaming.api.datastream.DataStreamSource org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromData(java.lang.Object[])

@gyfora
Copy link
Contributor

gyfora commented Mar 14, 2025

Full CI test matrix just finished running for our internal fork.

The batch tests for Flink versions 1.18 and under failed with the below error. I think it would be ok to just the run the batch tests for versions 1.19 and up. Thoughts?

Error: m2025-03-14 13:09:31,565 o.a.f.k.o.o.JobStatusObserver  [ERROR][default/flink-example-wordcount-batch] Job 063b25ead9e781bfaad8a07e2b587f38 failed with error: java.lang.NoSuchMethodError: 'org.apache.flink.streaming.api.datastream.DataStreamSource org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromData(java.lang.Object[])

Makes sense

@gyfora gyfora merged commit c4d460b into apache:main Mar 16, 2025
121 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants