Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ scalastyle-output.xml
.classpath
.idea/*
!.idea/vcs.xml
.vscode
.metadata
.settings
.project
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确

Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。
有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)

{{< connector_artifact flink-connector-kafka kafka >}}
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/datastream/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading

Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client.
The version of the client it uses may change between Flink releases.
Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.
Modern Kafka clients are backwards compatible with broker versions 2.1.0 or later.
For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility).

{{< connector_artifact flink-connector-kafka kafka >}}
Expand Down
1 change: 1 addition & 0 deletions flink-connector-kafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/latest_offset_resume_topic*
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public Set<KafkaStream> getAllStreams() {
@Override
public Map<String, KafkaStream> describeStreams(Collection<String> streamIds) {
try {
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).all().get().keySet()
.stream()
return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get()
.keySet().stream()
.collect(Collectors.toMap(topic -> topic, this::createKafkaStream));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Fetching all streams failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -533,12 +533,11 @@ public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) {

@Override
public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
ListConsumerGroupOffsetsOptions options =
new ListConsumerGroupOffsetsOptions()
.topicPartitions(new ArrayList<>(partitions));
ListConsumerGroupOffsetsSpec offsetsSpec =
new ListConsumerGroupOffsetsSpec().topicPartitions(partitions);
try {
return adminClient
.listConsumerGroupOffsets(groupId, options)
.listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec))
.partitionsToOffsetAndMetadata()
.thenApply(
result -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase {
private static final String INIT_KAFKA_RETRIES = "0";
private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000";
private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000";
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000";
/**
* The delivery timeout has to be greater than the request timeout as the latter is part of the
* former and this is enforced by a compile time check.
*/
private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500";

@RegisterExtension
public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -36,6 +36,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -238,10 +239,10 @@ public static void setupCommittedOffsets(String topic)
Map<TopicPartition, OffsetAndMetadata> toVerify =
adminClient
.listConsumerGroupOffsets(
GROUP_ID,
new ListConsumerGroupOffsetsOptions()
.topicPartitions(
new ArrayList<>(committedOffsets.keySet())))
Collections.singletonMap(
GROUP_ID,
new ListConsumerGroupOffsetsSpec()
.topicPartitions(committedOffsets.keySet())))
.partitionsToOffsetAndMetadata()
.get();
assertThat(toVerify).as("The offsets are not committed").isEqualTo(committedOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,13 @@ public KafkaOffsetHandlerImpl() {

@Override
public Long getCommittedOffset(String topicName, int partition) {
OffsetAndMetadata committed =
offsetClient.committed(new TopicPartition(topicName, partition));
TopicPartition topicPartition = new TopicPartition(topicName, partition);
Map<TopicPartition, OffsetAndMetadata> committedMap =
offsetClient.committed(Collections.singleton(topicPartition));
if (committedMap == null) {
return null;
}
OffsetAndMetadata committed = committedMap.get(topicPartition);
return (committed != null) ? committed.offset() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)

- org.apache.kafka:kafka-clients:3.9.1
- org.apache.kafka:kafka-clients:4.0.0
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ under the License.
<!-- Main Dependencies -->
<confluent.version>7.9.2</confluent.version>
<flink.version>2.0.0</flink.version>
<kafka.version>3.9.1</kafka.version>
<kafka.version>4.0.0</kafka.version>

<!-- Other Dependencies -->
<avro.version>1.12.0</avro.version>
Expand Down
4 changes: 4 additions & 0 deletions tools/ci/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ logger.flink.level = WARN
logger.flinkconnector.name = org.apache.flink.connector
logger.flinkconnector.level = INFO

# Allow the LicenseChecker to log its output
logger.licensechecker.name = org.apache.flink.tools.ci.licensecheck
logger.licensechecker.level = INFO

# Kafka producer and consumer level
logger.kafka.name = org.apache.kafka
logger.kafka.level = OFF
Expand Down
Loading