diff --git a/.gitignore b/.gitignore index 485c27aa6..43b09a547 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ scalastyle-output.xml .classpath .idea/* !.idea/vcs.xml +.vscode .metadata .settings .project diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md index 4a90ece31..a80d3371d 100644 --- a/docs/content.zh/docs/connectors/datastream/kafka.md +++ b/docs/content.zh/docs/connectors/datastream/kafka.md @@ -33,7 +33,7 @@ Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器使用精确 Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。 该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。 -当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。 +当前 Kafka client 向后兼容 2.1.0 或更高版本的 Kafka broker。 有关 Kafka 兼容性的更多细节,请参考 [Kafka 官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。 {{< connector_artifact flink-connector-kafka kafka >}} diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md index bddcefec9..d4b1ba22a 100644 --- a/docs/content/docs/connectors/datastream/kafka.md +++ b/docs/content/docs/connectors/datastream/kafka.md @@ -33,7 +33,7 @@ Flink provides an [Apache Kafka](https://kafka.apache.org) connector for reading Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. -Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. +Modern Kafka clients are backwards compatible with broker versions 2.1.0 or later. For details on Kafka compatibility, please refer to the official [Kafka documentation](https://kafka.apache.org/protocol.html#protocol_compatibility). {{< connector_artifact flink-connector-kafka kafka >}} diff --git a/flink-connector-kafka/.gitignore b/flink-connector-kafka/.gitignore new file mode 100644 index 000000000..69e9a73be --- /dev/null +++ b/flink-connector-kafka/.gitignore @@ -0,0 +1 @@ +/latest_offset_resume_topic* diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java index 6cef3ab31..1f2f0fd18 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java @@ -72,8 +72,8 @@ public Set getAllStreams() { @Override public Map describeStreams(Collection streamIds) { try { - return getAdminClient().describeTopics(new ArrayList<>(streamIds)).all().get().keySet() - .stream() + return getAdminClient().describeTopics(new ArrayList<>(streamIds)).allTopicNames().get() + .keySet().stream() .collect(Collectors.toMap(topic -> topic, this::createKafkaStream)); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException("Fetching all streams failed", e); diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 10025fa2a..f3058193c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -32,7 +32,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -533,12 +533,11 @@ public PartitionOffsetsRetrieverImpl(AdminClient adminClient, String groupId) { @Override public Map committedOffsets(Collection partitions) { - ListConsumerGroupOffsetsOptions options = - new ListConsumerGroupOffsetsOptions() - .topicPartitions(new ArrayList<>(partitions)); + ListConsumerGroupOffsetsSpec offsetsSpec = + new ListConsumerGroupOffsetsSpec().topicPartitions(partitions); try { return adminClient - .listConsumerGroupOffsets(groupId, options) + .listConsumerGroupOffsets(Collections.singletonMap(groupId, offsetsSpec)) .partitionsToOffsetAndMetadata() .thenApply( result -> { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java index 4a7d25c63..86fc5846f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterFaultToleranceITCase.java @@ -40,7 +40,11 @@ public class KafkaWriterFaultToleranceITCase extends KafkaWriterTestBase { private static final String INIT_KAFKA_RETRIES = "0"; private static final String INIT_KAFKA_REQUEST_TIMEOUT_MS = "1000"; private static final String INIT_KAFKA_MAX_BLOCK_MS = "1000"; - private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1000"; + /** + * The delivery timeout has to be greater than the request timeout as the latter is part of the + * former and this is enforced by a compile time check. + */ + private static final String INIT_KAFKA_DELIVERY_TIMEOUT_MS = "1500"; @RegisterExtension public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java index d82425f57..ed4a10ef4 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceTestEnv.java @@ -22,7 +22,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTestBase; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -238,10 +239,10 @@ public static void setupCommittedOffsets(String topic) Map toVerify = adminClient .listConsumerGroupOffsets( - GROUP_ID, - new ListConsumerGroupOffsetsOptions() - .topicPartitions( - new ArrayList<>(committedOffsets.keySet()))) + Collections.singletonMap( + GROUP_ID, + new ListConsumerGroupOffsetsSpec() + .topicPartitions(committedOffsets.keySet()))) .partitionsToOffsetAndMetadata() .get(); assertThat(toVerify).as("The offsets are not committed").isEqualTo(committedOffsets); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 0a65e9126..9c3604677 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -282,8 +282,13 @@ public KafkaOffsetHandlerImpl() { @Override public Long getCommittedOffset(String topicName, int partition) { - OffsetAndMetadata committed = - offsetClient.committed(new TopicPartition(topicName, partition)); + TopicPartition topicPartition = new TopicPartition(topicName, partition); + Map committedMap = + offsetClient.committed(Collections.singleton(topicPartition)); + if (committedMap == null) { + return null; + } + OffsetAndMetadata committed = committedMap.get(topicPartition); return (committed != null) ? committed.offset() : null; } diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE index 7516ca346..75ecac951 100644 --- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE @@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.kafka:kafka-clients:3.9.1 +- org.apache.kafka:kafka-clients:4.0.0 diff --git a/pom.xml b/pom.xml index 752dec822..a1bf27718 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ under the License. 7.9.2 2.0.0 - 3.9.1 + 4.0.0 1.12.0 diff --git a/tools/ci/log4j.properties b/tools/ci/log4j.properties index 25ef1cf96..d518a5f0d 100644 --- a/tools/ci/log4j.properties +++ b/tools/ci/log4j.properties @@ -44,6 +44,10 @@ logger.flink.level = WARN logger.flinkconnector.name = org.apache.flink.connector logger.flinkconnector.level = INFO +# Allow the LicenseChecker to log its output +logger.licensechecker.name = org.apache.flink.tools.ci.licensecheck +logger.licensechecker.level = INFO + # Kafka producer and consumer level logger.kafka.name = org.apache.kafka logger.kafka.level = OFF