Skip to content

Conversation

tomncooper
Copy link
Contributor

@tomncooper tomncooper commented Mar 28, 2025

Kafka Client version 4.0.0 has been released. This has many performance improvements and security fixes. This version is backwards compatible with older Kafka versions, however as noted in the release notes:

Old protocol API versions have been removed. Users should ensure brokers are version 2.1 or higher before upgrading Java clients (including Connect and Kafka Streams which use the clients internally) to 4.0.

Therefore, moving to this version will mean that the connector cannot be used with Kafka clusters running Kafka version 2.0 or older. Hopefully, users have upgraded beyond this point by now but it will need to be stated clearly in the release notes. This PR also advances the connector version to 5.0-SNAPSHOT to recognise this breaking change.

The new Kafka client now enforces the requirement for the delivery time-out configuration in the Producer to be longer than the request timeout (which is part of the overall delivery time-out) at compile time. Therefore the relevant configurations have been updated.

@MartijnVisser
Copy link
Contributor

@tomncooper I suspect that this will fail, given that the Flink Kafka connector relies on reflection for the two-phase commit part.

@tomncooper
Copy link
Contributor Author

@MartijnVisser It passed all the tests in my local repo (using mvn verify)? Are there other tests I should run (perhaps in the main Flink repo) to verify?

@MartijnVisser
Copy link
Contributor

MartijnVisser commented Mar 31, 2025

Are there other tests I should run (perhaps in the main Flink repo) to verify?

Yeah, mvn clean install -Dflink.version=2.0.0 -Dscala-2.12 -Prun-end-to-end-tests -DdistDir=/yourpathto/flink-2.0.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties is what CI also runs (and has failed with unfortunately)

@tomncooper
Copy link
Contributor Author

So I have run:

mvn clean install -Prun-end-to-end-tests -Dflink.version=2.0.0 -Dscala-2.12 -DdistDir=$HOME/tools/flink/flink-2.0.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties

With both Java 17 and 21 locally and all tests are passing 🤔

@MartijnVisser
Copy link
Contributor

So I have run:

mvn clean install -Prun-end-to-end-tests -Dflink.version=2.0.0 -Dscala-2.12 -DdistDir=$HOME/tools/flink/flink-2.0.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties

With both Java 17 and 21 locally and all tests are passing 🤔

Hmmmm it could be a flaky test, but it's hard to pin down if it was always flaky, or just with this PR.

@tomncooper
Copy link
Contributor Author

Ok, that run passed the e2e tests but failed on the licence check.

@tomncooper
Copy link
Contributor Author

Hummm, so I assume this is the test that is failing in CI. But when I run the same locally:

mvn exec:java@check-license -N -Dexec.args="/tmp/flink-maven-build-output $(pwd) /tmp/flink-validation-deployment" -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties

It passes?

@tomncooper
Copy link
Contributor Author

I rebased after the merge of #138 and that restarted the CI.

It seems that KafkaWriterFaultToleranceITCase is flaky as it passes locally with java 17 and 21 and has passed CI previously on this PR .

@davidradl
Copy link

@tomncooper @MartijnVisser @AHeise I suggest we move to Kafka 4.0..0 client when we do Kafka connector v4 for Flink 2.
WDYT? ( assuming we can sort out the tests)

I think there is a case to say we do not backport Kafka client v4.0.0 support to Kafka connector v3.3 or 3.4, in case there are old Kafka clusters that we would not want to break on the v3 stream.

@tomncooper
Copy link
Contributor Author

@tomncooper @MartijnVisser @AHeise I suggest we move to Kafka 4.0..0 client when we do Kafka connector v4 for Flink 2. WDYT? ( assuming we can sort out the tests)

I think there is a case to say we do not backport Kafka client v4.0.0 support to Kafka connector v3.3 or 3.4, in case there are old Kafka clusters that we would not want to break on the v3 stream.

@davidradl As per the discussion on the dev mailing list, we are going to move ahead with a Connector 4.0 release with Flink 2.0 and Kafka 3.9.0. We can then do point release updates.

I still think, given that this PR would drop support for older Kafka versions it should be part of a further major version bump (ie 5.0) but we can have that discussion when the time comes to merge this.

@tomncooper
Copy link
Contributor Author

tomncooper commented Jul 3, 2025

In order for the CI to pass, this PR will need to be merged first.

@tomncooper tomncooper requested a review from davidradl July 4, 2025 09:36
@tomncooper tomncooper force-pushed the kafka-4.0.0 branch 4 times, most recently from a32aca4 to 655b3c7 Compare July 14, 2025 14:22
@fapaul fapaul requested review from fapaul and removed request for davidradl July 17, 2025 07:04
@fapaul
Copy link
Contributor

fapaul commented Jul 22, 2025

I am not convinced yet why this PR bumps the major version of the connector.

I still think, given that this PR would drop support for older Kafka versions it should be part of a further major version bump (ie 5.0) but we can have that discussion when the time comes to merge this.

Is this true? Afaict the proposed change only increases the source compatible version but the new client version is still able to talk to older kafka clusters since the Kafka clients are backwards compatible.
Users should still be safe to upgrade to the new connector version without changing the rest of their infrastructure e.g. Flink cluster/Kafka cluster upgrade.

@tomncooper
Copy link
Contributor Author

@fapaul Usually the clients are backwards compatible. However, Kafka 4.0 dropped support for many APIs and the older log message formats. As per the release notes:

Old protocol API versions have been removed. Users should ensure brokers are version 2.1 or higher before upgrading Java clients (including Connect and Kafka Streams which use the clients internally) to 4.0.

The 4.0 compatibility matrix further lays out the server version compatibility and shows that ideally, users of the Kafka 4.0 clients should be running Kafka 3.0 or higher.

I am happy with the version being whatever the community decides. My personal opinion is that we should, by bumping the major version of the connector, telegraph the fact that the connector version containing this PR will drop support for broker on <=2.0 (and limit support for 2.1-2.8).

@fapaul
Copy link
Contributor

fapaul commented Jul 23, 2025

Thanks for the thorough explanation. In summary, in addition to dropping support for 0.x versions (has been done in the past already), we also now drop support for everything, including 2.0.

I am okay with the major version bump then. Can you look at the last failing tests to get this PR merged?

Also I noted the docs are slightly out of date https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#dependency still states

Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For details on Kafka compatibility, please refer to the official Kafka documentation.

Can you also update these docs to reflect that with this change, we expect the Kafka broker to be at least version 2.1?

@tomncooper
Copy link
Contributor Author

tomncooper commented Jul 23, 2025

@fapaul This PR already updated the docs to say Kafka 2.1.0 or later. Do you think it needs more in depth description? I am happy to add that, but I will need to create a JIRA for the Chinese language changes.

I will also try and figure out why the licence checker is unhappy. It would be extremely helpful if that tool actually told you what the issue was!

@fapaul
Copy link
Contributor

fapaul commented Jul 23, 2025

@fapaul This PR already updated the docs to say Kafka 2.1.0 or later. Do you think it needs more in depth description? I am happy to add that, but I will need to create a JIRA for the Chinese language changes.

Sorry missed, all good with the docs.

The log output is definitely strange. Usually there are logs for every violation. I suspect this change changed the default logging behavior 1b02ce8 by setting the Flink log level to WARN which also affects the LicenseChecker

@tomncooper
Copy link
Contributor Author

Ok I am stumped? Locally I run (with maven 3.8.6 and java 17):

mvn clean deploy -U -B --no-transfer-progress -Dflink.version=2.0.0 \
    -DaltDeploymentRepository=validation_repository::default::file:/tmp/flink-validation-deployment \
    -Dscala-2.12 \
    -Prun-end-to-end-tests -DdistDir="$HOME/tools/flink/flink-2.0.0" \
    -Dflink.convergence.phase=install -Pcheck-convergence \
    -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 \
    -Dlog4j.configurationFile="file://$(pwd)/tools/ci/log4j.properties" \
   | tee /tmp/mvn_build_output.out

Then the licence checker:

mvn -U -B --no-transfer-progress -Dflink.version=2.0.0 \
    exec:java@check-license -N \
    -Dexec.args="/tmp/mvn_build_output.out $(pwd) /tmp/flink-validation-deployment" \
    -Dhttp.keepAlive=false \
    -Dmaven.wagon.http.pool=false \
    -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 \
    -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties

Which as far as I can tell is what the CI is running and it passes?

@tomncooper
Copy link
Contributor Author

Ok, I have no idea why but rebasing on main has lead to green tests on my fork.

I added an override to the CI log4j config to make sure the licence checker is able to output its logs.

@fapaul Are you able to kick the CI on this PR?

@tomncooper tomncooper requested a review from davidradl July 23, 2025 16:04
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you squash the commits on this PR and rebase on the latest master?

Note that this update will make the connector incompatible with Kafka
clusters running Kafka version 2.0 and older.

Signed-off-by: Thomas Cooper <[email protected]>
@tomncooper
Copy link
Contributor Author

@fapaul I squashed and rebased.

However, before we merge this I think it would be good to do a Kafka connector 4.1 release with the 3.9.1 client. That way if there are any users running older kafka versions they could still use Flink 2.0 with a fully patched (3.9.0 has a critical CVE) kafka client.

I raised this on the dev mailing list a while ago, but didn't get any feedback. WDYT?

@tomncooper
Copy link
Contributor Author

@fapaul Now that the 4.0.1 release of the connector is out, are we ok to merge this PR?

@fapaul
Copy link
Contributor

fapaul commented Aug 29, 2025

Sorry for late reply yes, I will merge the PR now.

@fapaul fapaul merged commit ac98345 into apache:main Aug 29, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants