Skip to content

Commit 688d21c

Browse files
committed
[FLINK-37583] Upgrade to Kafka 4.0.0 client.
Note that this update will make the connector incompatible with Kafka clusters running Kafka version 2.0 and older. Signed-off-by: Thomas Cooper <[email protected]>
1 parent 0455935 commit 688d21c

File tree

9 files changed

+48
-42
lines changed

9 files changed

+48
-42
lines changed

docs/content.zh/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确
3333

3434
Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
3535
该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
36-
当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
36+
当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。
3737
有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

docs/content/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading
3333

3434
Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client.
3535
The version of the client it uses may change between Flink releases.
36-
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
36+
Modern Kafka clients are backwards compatible with broker versions 2.1.0 or later.
3737
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

flink-connector-kafka/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/latest_offset_resume_topic*

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public Set<KafkaStream> getAllStreams() {
7272
@Override
7373
public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
7474
try {
75-
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).all().get().keySet()
75+
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get().keySet()
7676
.stream()
7777
.collect(Collectors.toMap(topic -> topic, this::createKafkaStream));
7878
} catch (InterruptedException | ExecutionException e) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
import org.apache.kafka.clients.admin.AdminClient;
3434
import org.apache.kafka.clients.admin.KafkaAdminClient;
35-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
35+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
3636
import org.apache.kafka.clients.admin.ListOffsetsResult;
3737
import org.apache.kafka.clients.admin.OffsetSpec;
3838
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -533,12 +533,11 @@ public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) {
533533

534534
@Override
535535
public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
536-
ListConsumerGroupOffsetsOptions options =
537-
new ListConsumerGroupOffsetsOptions()
538-
.topicPartitions(new ArrayList<>(partitions));
536+
ListConsumerGroupOffsetsSpec offsetsSpec =
537+
new ListConsumerGroupOffsetsSpec().topicPartitions(partitions);
539538
try {
540539
return adminClient
541-
.listConsumerGroupOffsets(groupId, options)
540+
.listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec))
542541
.partitionsToOffsetAndMetadata()
543542
.thenApply(
544543
result -> {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase {
4040
private static final String INIT_KAFKA_RETRIES = "0";
4141
private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000";
4242
private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000";
43-
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000";
43+
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500";
4444

4545
@RegisterExtension
4646
public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
2323

2424
import org.apache.kafka.clients.admin.AdminClient;
25-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
25+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
2626
import org.apache.kafka.clients.admin.RecordsToDelete;
2727
import org.apache.kafka.clients.consumer.ConsumerConfig;
2828
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,6 +36,7 @@
3636

3737
import java.util.ArrayList;
3838
import java.util.Collection;
39+
import java.util.Collections;
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
@@ -238,10 +239,10 @@ public static void setupCommittedOffsets(String topic)
238239
Map<TopicPartition, OffsetAndMetadata> toVerify =
239240
adminClient
240241
.listConsumerGroupOffsets(
242+
Collections.singletonMap(
241243
GROUP_ID,
242-
new ListConsumerGroupOffsetsOptions()
243-
.topicPartitions(
244-
new ArrayList<>(committedOffsets.keySet())))
244+
new ListConsumerGroupOffsetsSpec()
245+
.topicPartitions(committedOffsets.keySet())))
245246
.partitionsToOffsetAndMetadata()
246247
.get();
247248
assertThat(toVerify).as("The offsets are not committed").isEqualTo(committedOffsets);

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,13 @@ public KafkaOffsetHandlerImpl() {
282282

283283
@Override
284284
public Long getCommittedOffset(String topicName, int partition) {
285-
OffsetAndMetadata committed =
286-
offsetClient.committed(new TopicPartition(topicName, partition));
285+
TopicPartition topicPartition = new TopicPartition(topicName, partition);
286+
Map<TopicPartition, OffsetAndMetadata> committedMap =
287+
offsetClient.committed(Collections.singleton(topicPartition));
288+
if (committedMap == null) {
289+
return null;
290+
}
291+
OffsetAndMetadata committed = committedMap.get(topicPartition);
287292
return (committed != null) ? committed.offset() : null;
288293
}
289294

pom.xml

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ under the License.
2828
<modelVersion>4.0.0</modelVersion>
2929

3030
<artifactId>flink-connector-kafka-parent</artifactId>
31-
<version>4.0-SNAPSHOT</version>
32-
<name>Flink : Connectors : Kafka : Parent</name>
33-
<packaging>pom</packaging>
34-
<inceptionYear>2022</inceptionYear>
31+
<version>4.0-SNAPSHOT</version>
32+
<name>Flink : Connectors : Kafka : Parent</name>
33+
<packaging>pom</packaging>
34+
<inceptionYear>2022</inceptionYear>
3535

3636
<scm>
3737
<url>https://github.com/apache/flink-connector-kafka</url>
@@ -48,33 +48,33 @@ under the License.
4848
<module>flink-python</module>
4949
</modules>
5050

51-
<properties>
52-
<flink.version>2.0.0</flink.version>
53-
<kafka.version>3.9.0</kafka.version>
54-
<confluent.version>7.8.2</confluent.version>
55-
56-
<jackson-bom.version>2.16.2</jackson-bom.version>
57-
<junit4.version>4.13.2</junit4.version>
58-
<junit5.version>5.9.1</junit5.version>
59-
<assertj.version>3.23.1</assertj.version>
60-
<testcontainers.version>1.17.2</testcontainers.version>
61-
<mockito.version>3.4.6</mockito.version>
62-
<powermock.version>2.0.9</powermock.version>
63-
<hamcrest.version>1.3</hamcrest.version>
64-
<byte-buddy.version>1.12.10</byte-buddy.version>
65-
<commons-cli.version>1.5.0</commons-cli.version>
66-
<scala.binary.version>2.12</scala.binary.version>
67-
<scala-reflect.version>2.12.19</scala-reflect.version>
68-
<scala-library.version>2.12.19</scala-library.version>
69-
<snappy-java.version>1.1.10.5</snappy-java.version>
70-
<avro.version>1.11.4</avro.version>
71-
<guava.version>32.1.2-jre</guava.version>
51+
<properties>
52+
<flink.version>2.0.0</flink.version>
53+
<kafka.version>4.0.0</kafka.version>
54+
<confluent.version>7.8.2</confluent.version>
55+
56+
<jackson-bom.version>2.16.2</jackson-bom.version>
57+
<junit4.version>4.13.2</junit4.version>
58+
<junit5.version>5.9.1</junit5.version>
59+
<assertj.version>3.23.1</assertj.version>
60+
<testcontainers.version>1.17.2</testcontainers.version>
61+
<mockito.version>3.4.6</mockito.version>
62+
<powermock.version>2.0.9</powermock.version>
63+
<hamcrest.version>1.3</hamcrest.version>
64+
<byte-buddy.version>1.12.10</byte-buddy.version>
65+
<commons-cli.version>1.5.0</commons-cli.version>
66+
<scala.binary.version>2.12</scala.binary.version>
67+
<scala-reflect.version>2.12.20</scala-reflect.version>
68+
<scala-library.version>2.12.20</scala-library.version>
69+
<snappy-java.version>1.1.10.5</snappy-java.version>
70+
<avro.version>1.11.4</avro.version>
71+
<guava.version>32.1.2-jre</guava.version>
7272

7373
<japicmp.skip>false</japicmp.skip>
7474
<japicmp.referenceVersion>1.17.0</japicmp.referenceVersion>
7575

76-
<slf4j.version>1.7.36</slf4j.version>
77-
<log4j.version>2.17.1</log4j.version>
76+
<slf4j.version>1.7.36</slf4j.version>
77+
<log4j.version>2.24.3</log4j.version>
7878

7979
<flink.parent.artifactId>flink-connector-kafka-parent</flink.parent.artifactId>
8080

0 commit comments

Comments
 (0)