Skip to content

Commit a32aca4

Browse files
committed
[FLINK-37583] Upgrade to Kafka 4.0.0 client.
Note that this update will make the connector incompatible with Kafka clusters running Kafka version 2.0 and older. Signed-off-by: Thomas Cooper <[email protected]>
1 parent ccccaa8 commit a32aca4

File tree

11 files changed

+43
-31
lines changed

11 files changed

+43
-31
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ scalastyle-output.xml
44
.classpath
55
.idea/*
66
!.idea/vcs.xml
7+
.vscode
78
.metadata
89
.settings
910
.project

docs/content.zh/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确
3333

3434
Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
3535
该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
36-
当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
36+
当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。
3737
有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

docs/content/docs/connectors/datastream/kafka.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading
3333

3434
Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client.
3535
The version of the client it uses may change between Flink releases.
36-
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
36+
Modern Kafka clients are backwards compatible with broker versions 2.1.0 or later.
3737
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).
3838

3939
{{< connector_artifact flink-connector-kafka kafka >}}

flink-connector-kafka/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
/latest_offset_resume_topic*

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ public Set<KafkaStream> getAllStreams() {
7272
@Override
7373
public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
7474
try {
75-
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).all().get().keySet()
76-
.stream()
75+
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get()
76+
.keySet().stream()
7777
.collect(Collectors.toMap(topic -> topic, this::createKafkaStream));
7878
} catch (InterruptedException | ExecutionException e) {
7979
throw new RuntimeException("Fetching all streams failed", e);

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
import org.apache.kafka.clients.admin.AdminClient;
3434
import org.apache.kafka.clients.admin.KafkaAdminClient;
35-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
35+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
3636
import org.apache.kafka.clients.admin.ListOffsetsResult;
3737
import org.apache.kafka.clients.admin.OffsetSpec;
3838
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -533,12 +533,11 @@ public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) {
533533

534534
@Override
535535
public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
536-
ListConsumerGroupOffsetsOptions options =
537-
new ListConsumerGroupOffsetsOptions()
538-
.topicPartitions(new ArrayList<>(partitions));
536+
ListConsumerGroupOffsetsSpec offsetsSpec =
537+
new ListConsumerGroupOffsetsSpec().topicPartitions(partitions);
539538
try {
540539
return adminClient
541-
.listConsumerGroupOffsets(groupId, options)
540+
.listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec))
542541
.partitionsToOffsetAndMetadata()
543542
.thenApply(
544543
result -> {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase {
4040
private static final String INIT_KAFKA_RETRIES = "0";
4141
private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000";
4242
private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000";
43-
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000";
43+
/**
44+
* The delivery timeout has to be greater than the request timeout as the latter is part of the
45+
* former and this is enforced by a compile time check.
46+
*/
47+
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500";
4448

4549
@RegisterExtension
4650
public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
2323

2424
import org.apache.kafka.clients.admin.AdminClient;
25-
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
25+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
2626
import org.apache.kafka.clients.admin.RecordsToDelete;
2727
import org.apache.kafka.clients.consumer.ConsumerConfig;
2828
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,6 +36,7 @@
3636

3737
import java.util.ArrayList;
3838
import java.util.Collection;
39+
import java.util.Collections;
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
@@ -238,10 +239,10 @@ public static void setupCommittedOffsets(String topic)
238239
Map<TopicPartition, OffsetAndMetadata> toVerify =
239240
adminClient
240241
.listConsumerGroupOffsets(
241-
GROUP_ID,
242-
new ListConsumerGroupOffsetsOptions()
243-
.topicPartitions(
244-
new ArrayList<>(committedOffsets.keySet())))
242+
Collections.singletonMap(
243+
GROUP_ID,
244+
new ListConsumerGroupOffsetsSpec()
245+
.topicPartitions(committedOffsets.keySet())))
245246
.partitionsToOffsetAndMetadata()
246247
.get();
247248
assertThat(toVerify).as("The offsets are not committed").isEqualTo(committedOffsets);

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,13 @@ public KafkaOffsetHandlerImpl() {
282282

283283
@Override
284284
public Long getCommittedOffset(String topicName, int partition) {
285-
OffsetAndMetadata committed =
286-
offsetClient.committed(new TopicPartition(topicName, partition));
285+
TopicPartition topicPartition = new TopicPartition(topicName, partition);
286+
Map<TopicPartition, OffsetAndMetadata> committedMap =
287+
offsetClient.committed(Collections.singleton(topicPartition));
288+
if (committedMap == null) {
289+
return null;
290+
}
291+
OffsetAndMetadata committed = committedMap.get(topicPartition);
287292
return (committed != null) ? committed.offset() : null;
288293
}
289294

flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
66

77
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
88

9-
- org.apache.kafka:kafka-clients:3.9.1
9+
- org.apache.kafka:kafka-clients:4.0.0

0 commit comments

Comments
 (0)