Skip to content

Commit eaa6523

Browse files
committed
[FLINK-37583] Upgrade to Kafka 4.0.0 client.
Note that this update will make the connector incompatible with Kafka clusters running Kafka version 2.0 and older. Signed-off-by: Thomas Cooper <[email protected]>
1 parent 1dc5c1a commit eaa6523

File tree

9 files changed

+26
-20
lines changed

9 files changed

+26
-20
lines changed

docs/content.zh/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确
3333

3434
Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
3535
该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
36-
当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
36+
当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。
3737
有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

docs/content/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading
3333

3434
Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client.
3535
The version of the client it uses may change between Flink releases.
36-
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
36+
Modern Kafka clients are backwards compatible with broker versions 2.1.0 or later.
3737
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

flink-connector-kafka/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/latest_offset_resume_topic*

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public Set<KafkaStream> getAllStreams() {
7272
@Override
7373
public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
7474
try {
75-
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).all().get().keySet()
75+
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get().keySet()
7676
.stream()
7777
.collect(Collectors.toMap(topic -> topic, this::createKafkaStream));
7878
} catch (InterruptedException | ExecutionException e) {

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
import org.apache.kafka.clients.admin.AdminClient;
3434
import org.apache.kafka.clients.admin.KafkaAdminClient;
35-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
35+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
3636
import org.apache.kafka.clients.admin.ListOffsetsResult;
3737
import org.apache.kafka.clients.admin.OffsetSpec;
3838
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -533,12 +533,11 @@ public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) {
533533

534534
@Override
535535
public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
536-
ListConsumerGroupOffsetsOptions options =
537-
new ListConsumerGroupOffsetsOptions()
538-
.topicPartitions(new ArrayList<>(partitions));
536+
ListConsumerGroupOffsetsSpec offsetsSpec =
537+
new ListConsumerGroupOffsetsSpec().topicPartitions(partitions);
539538
try {
540539
return adminClient
541-
.listConsumerGroupOffsets(groupId, options)
540+
.listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec))
542541
.partitionsToOffsetAndMetadata()
543542
.thenApply(
544543
result -> {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase {
4040
private static final String INIT_KAFKA_RETRIES = "0";
4141
private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000";
4242
private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000";
43-
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000";
43+
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500";
4444

4545
@RegisterExtension
4646
public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
2323

2424
import org.apache.kafka.clients.admin.AdminClient;
25-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
25+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
2626
import org.apache.kafka.clients.admin.RecordsToDelete;
2727
import org.apache.kafka.clients.consumer.ConsumerConfig;
2828
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,6 +36,7 @@
3636

3737
import java.util.ArrayList;
3838
import java.util.Collection;
39+
import java.util.Collections;
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
@@ -238,10 +239,10 @@ public static void setupCommittedOffsets(String topic)
238239
Map<TopicPartition, OffsetAndMetadata> toVerify =
239240
adminClient
240241
.listConsumerGroupOffsets(
242+
Collections.singletonMap(
241243
GROUP_ID,
242-
new ListConsumerGroupOffsetsOptions()
243-
.topicPartitions(
244-
new ArrayList<>(committedOffsets.keySet())))
244+
new ListConsumerGroupOffsetsSpec()
245+
.topicPartitions(committedOffsets.keySet())))
245246
.partitionsToOffsetAndMetadata()
246247
.get();
247248
assertThat(toVerify).as("The offsets are not committed").isEqualTo(committedOffsets);

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,13 @@ public KafkaOffsetHandlerImpl() {
292292

293293
@Override
294294
public Long getCommittedOffset(String topicName, int partition) {
295-
OffsetAndMetadata committed =
296-
offsetClient.committed(new TopicPartition(topicName, partition));
295+
TopicPartition topicPartition = new TopicPartition(topicName, partition);
296+
Map<TopicPartition, OffsetAndMetadata> committedMap =
297+
offsetClient.committed(Collections.singleton(topicPartition));
298+
if (committedMap == null) {
299+
return null;
300+
}
301+
OffsetAndMetadata committed = committedMap.get(topicPartition);
297302
return (committed != null) ? committed.offset() : null;
298303
}
299304

pom.xml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ under the License.
2727

2828
<modelVersion>4.0.0</modelVersion>
2929

30-
<artifactId>flink-connector-kafka-parent</artifactId>
30+
<artifactId>flink-connector-kafka-parent</artifactId>
3131
<version>4.0-SNAPSHOT</version>
3232
<name>Flink : Connectors : Kafka : Parent</name>
3333
<packaging>pom</packaging>
@@ -50,7 +50,7 @@ under the License.
5050

5151
<properties>
5252
<flink.version>2.0.0</flink.version>
53-
<kafka.version>3.9.0</kafka.version>
53+
<kafka.version>4.0.0</kafka.version>
5454
<confluent.version>7.8.2</confluent.version>
5555

5656
<jackson-bom.version>2.16.2</jackson-bom.version>
@@ -64,8 +64,8 @@ under the License.
6464
<byte-buddy.version>1.12.10</byte-buddy.version>
6565
<commons-cli.version>1.5.0</commons-cli.version>
6666
<scala.binary.version>2.12</scala.binary.version>
67-
<scala-reflect.version>2.12.19</scala-reflect.version>
68-
<scala-library.version>2.12.19</scala-library.version>
67+
<scala-reflect.version>2.12.20</scala-reflect.version>
68+
<scala-library.version>2.12.20</scala-library.version>
6969
<snappy-java.version>1.1.10.5</snappy-java.version>
7070
<avro.version>1.11.4</avro.version>
7171
<guava.version>32.1.2-jre</guava.version>
@@ -74,7 +74,7 @@ under the License.
7474
<japicmp.referenceVersion>1.17.0</japicmp.referenceVersion>
7575

7676
<slf4j.version>1.7.36</slf4j.version>
77-
<log4j.version>2.17.1</log4j.version>
77+
<log4j.version>2.24.3</log4j.version>
7878

7979
<flink.parent.artifactId>flink-connector-kafka-parent</flink.parent.artifactId>
8080

0 commit comments

Comments
 (0)