From 7412f58a10eee5f337b1e6c4d3ed9a73096b6cb5 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Sun, 16 Nov 2025 23:30:10 +0000 Subject: [PATCH 1/7] Replace end offset consumer with admin client --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- sdks/java/io/kafka/build.gradle | 4 - sdks/java/io/kafka/kafka-201/build.gradle | 24 -- sdks/java/io/kafka/kafka-231/build.gradle | 24 -- sdks/java/io/kafka/kafka-241/build.gradle | 24 -- sdks/java/io/kafka/kafka-251/build.gradle | 24 -- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 35 +++ ...afkaIOReadImplementationCompatibility.java | 1 + .../beam/sdk/io/kafka/KafkaIOUtils.java | 5 + .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 59 +++-- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 244 ++++++++++++------ .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 52 ++++ .../io/kafka/upgrade/KafkaIOTranslation.java | 12 + .../kafka/upgrade/KafkaIOTranslationTest.java | 1 + 14 files changed, 308 insertions(+), 203 deletions(-) delete mode 100644 sdks/java/io/kafka/kafka-201/build.gradle delete mode 100644 sdks/java/io/kafka/kafka-231/build.gradle delete mode 100644 sdks/java/io/kafka/kafka-241/build.gradle delete mode 100644 sdks/java/io/kafka/kafka-251/build.gradle diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 18dbcad4fcd1..7c8be7a4c8fa 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -629,7 +629,7 @@ class BeamModulePlugin implements Plugin { def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" def everit_json_version = "1.14.2" - def kafka_version = "2.4.1" + def kafka_version = "2.8.2" def log4j2_version = "2.20.0" def nemo_version = "0.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index ba25078b64e3..1b5e8f19120a 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -36,10 +36,6 @@ ext { } def kafkaVersions = [ - '201': "2.0.1", - '231': "2.3.1", - '241': "2.4.1", - '251': "2.5.1", '282': "2.8.2", '312': "3.1.2", '390': "3.9.0", diff --git a/sdks/java/io/kafka/kafka-201/build.gradle b/sdks/java/io/kafka/kafka-201/build.gradle deleted file mode 100644 index a26ca4ac19cf..000000000000 --- a/sdks/java/io/kafka/kafka-201/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.0.1" - undelimited="201" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-231/build.gradle b/sdks/java/io/kafka/kafka-231/build.gradle deleted file mode 100644 index 712158dcd3ae..000000000000 --- a/sdks/java/io/kafka/kafka-231/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.3.1" - undelimited="231" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-241/build.gradle b/sdks/java/io/kafka/kafka-241/build.gradle deleted file mode 100644 index c0ac7df674b5..000000000000 --- a/sdks/java/io/kafka/kafka-241/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.4.1" - undelimited="241" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-251/build.gradle b/sdks/java/io/kafka/kafka-251/build.gradle deleted file mode 100644 index 4de9f97a738a..000000000000 --- a/sdks/java/io/kafka/kafka-251/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.5.1" - undelimited="251" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ad5535517646..eb11d5b0c025 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -114,6 +114,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -604,6 +605,7 @@ public static Read read() { return new AutoValue_KafkaIO_Read.Builder() .setTopics(new ArrayList<>()) .setTopicPartitions(new ArrayList<>()) + .setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) @@ -695,6 +697,9 @@ public abstract static class Read @Pure public abstract @Nullable Coder getValueCoder(); + @Pure + public abstract SerializableFunction, Admin> getAdminFactoryFn(); + @Pure public abstract SerializableFunction, Consumer> getConsumerFactoryFn(); @@ -778,6 +783,9 @@ abstract static class Builder { abstract Builder setValueCoder(Coder valueCoder); + abstract Builder setAdminFactoryFn( + SerializableFunction, Admin> adminFactoryFn); + abstract Builder setConsumerFactoryFn( SerializableFunction, Consumer> consumerFactoryFn); @@ -861,6 +869,7 @@ static void setupExternalBuilder( // Set required defaults builder.setTopicPartitions(Collections.emptyList()); + builder.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN); builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN); if (config.maxReadTime != null) { builder.setMaxReadTime(Duration.standardSeconds(config.maxReadTime)); @@ -1306,6 +1315,15 @@ public Read withValueDeserializerProviderAndCoder( .build(); } + /** + * A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful + * for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}. + */ + public Read withAdminFactoryFn( + SerializableFunction, Admin> adminFactoryFn) { + return toBuilder().setAdminFactoryFn(adminFactoryFn).build(); + } + /** * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}. @@ -1963,6 +1981,7 @@ public PCollection> expand(PBegin input) { ReadSourceDescriptors.read() .withConsumerConfigOverrides(kafkaRead.getConsumerConfig()) .withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig()) + .withAdminFactoryFn(kafkaRead.getAdminFactoryFn()) .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProviderAndCoder( kafkaRead.getKeyDeserializerProvider(), keyCoder) @@ -2479,6 +2498,9 @@ public abstract static class ReadSourceDescriptors @Pure abstract @Nullable Coder getValueCoder(); + @Pure + abstract SerializableFunction, Admin> getAdminFactoryFn(); + @Pure abstract SerializableFunction, Consumer> getConsumerFactoryFn(); @@ -2529,6 +2551,9 @@ abstract static class Builder { abstract ReadSourceDescriptors.Builder setOffsetConsumerConfig( @Nullable Map offsetConsumerConfig); + abstract ReadSourceDescriptors.Builder setAdminFactoryFn( + SerializableFunction, Admin> adminFactoryFn); + abstract ReadSourceDescriptors.Builder setConsumerFactoryFn( SerializableFunction, Consumer> consumerFactoryFn); @@ -2583,6 +2608,7 @@ abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( public static ReadSourceDescriptors read() { return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder() + .setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setCommitOffsetEnabled(false) @@ -2683,6 +2709,15 @@ public ReadSourceDescriptors withValueDeserializerProviderAndCoder( .build(); } + /** + * A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful + * for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}. + */ + public ReadSourceDescriptors withAdminFactoryFn( + SerializableFunction, Admin> adminFactoryFn) { + return toBuilder().setAdminFactoryFn(adminFactoryFn).build(); + } + /** * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 8c5efb066d6e..4ae6651d4e0c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -83,6 +83,7 @@ enum KafkaIOReadProperties { TOPIC_PATTERN, KEY_CODER, VALUE_CODER, + ADMIN_FACTORY_FN, CONSUMER_FACTORY_FN, WATERMARK_FN(LEGACY), MAX_NUM_RECORDS(LEGACY) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 91aa85577959..24dde09400eb 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -76,6 +77,10 @@ public final class KafkaIOUtils { // lets allow these, applications can have better resume point for restarts. ); + // Default Kafka Admin supplier. + static final SerializableFunction, Admin> KAFKA_ADMIN_FACTORY_FN = + Admin::create; + // default Kafka 0.9 Consumer supplier. static final SerializableFunction, Consumer> KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a05abba06e75..bea240f31c25 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -67,10 +67,14 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; @@ -202,6 +206,8 @@ private static class Bounded extends ReadFromKafkaDoFn { private ReadFromKafkaDoFn( ReadSourceDescriptors transform, TupleTag>> recordTag) { + final SerializableFunction, Admin> adminFactoryFn = + transform.getAdminFactoryFn(); final SerializableFunction, Consumer> consumerFactoryFn = transform.getConsumerFactoryFn(); this.consumerConfig = transform.getConsumerConfig(); @@ -250,15 +256,14 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) public KafkaLatestOffsetEstimator load( final KafkaSourceDescriptor sourceDescriptor) { LOG.info( - "Creating Kafka consumer for offset estimation for {}", + "Creating Kafka admin for offset estimation for {}", sourceDescriptor); final Map config = KafkaIOUtils.overrideBootstrapServersConfig( consumerConfig, sourceDescriptor); - final Consumer consumer = - consumerFactoryFn.apply(config); + final Admin admin = adminFactoryFn.apply(config); return new KafkaLatestOffsetEstimator( - consumer, sourceDescriptor.getTopicPartition()); + admin, sourceDescriptor.getTopicPartition()); } })); this.pollConsumerCacheSupplier = @@ -355,37 +360,43 @@ public Consumer load( */ private static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable { - private final Consumer offsetConsumer; - private final Supplier offsetSupplier; - - KafkaLatestOffsetEstimator( - final Consumer offsetConsumer, final TopicPartition topicPartition) { - this.offsetConsumer = offsetConsumer; - this.offsetSupplier = + private static final ListOffsetsResult.ListOffsetsResultInfo DEFAULT_RESULT = + new ListOffsetsResult.ListOffsetsResultInfo( + Long.MIN_VALUE, Long.MIN_VALUE, Optional.empty()); + + private final Admin admin; + private final Supplier> + latestOffsetFutureSupplier; + private ListOffsetsResult.ListOffsetsResultInfo latestOffsetResult; + + KafkaLatestOffsetEstimator(final Admin admin, final TopicPartition topicPartition) { + this.admin = admin; + this.latestOffsetFutureSupplier = new ExpiringMemoizingSerializableSupplier<>( - () -> { - try { - return offsetConsumer - .endOffsets(Collections.singleton(topicPartition)) - .getOrDefault(topicPartition, Long.MIN_VALUE); - } catch (Throwable t) { - LOG.error("Failed to get end offset for {}", topicPartition, t); - return Long.MIN_VALUE; - } - }, + () -> + admin + .listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest())) + .partitionResult(topicPartition), Duration.ofSeconds(1), - Long.MIN_VALUE, + KafkaFuture.completedFuture(DEFAULT_RESULT), Duration.ZERO); + this.latestOffsetResult = DEFAULT_RESULT; } @Override public long estimate() { - return offsetSupplier.get(); + try { + latestOffsetResult = latestOffsetFutureSupplier.get().getNow(latestOffsetResult); + } catch (Throwable t) { + LOG.error("Failed to get latest offset", t); + } + + return latestOffsetResult.offset(); } @Override public void close() { - offsetConsumer.close(); + admin.close(); } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 703d323090dd..c29fdffcd2cb 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -43,6 +43,7 @@ import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; +import java.io.Serializable; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -65,7 +66,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; @@ -124,6 +128,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -137,6 +144,7 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -165,6 +173,9 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,6 +197,8 @@ public class KafkaIOTest { * - test KafkaRecordCoder */ + @Rule public final MockitoRule m = MockitoJUnit.rule(); + @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -201,6 +214,36 @@ public class KafkaIOTest { static List mkKafkaTopics = ImmutableList.of("topic_a", "topic_b"); static String mkKafkaServers = "myServer1:9092,myServer2:9092"; + private static Admin mkMockAdmin( + List topics, int partitionsPerTopic, int numElements, Map config) { + final long timestampStartMillis = + (Long) + config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG, LOG_APPEND_START_TIME.getMillis()); + final Admin mock = Mockito.mock(Admin.class); + Mockito.when(mock.listOffsets(Mockito.anyMap())) + .thenReturn( + Stream.generate(topics::stream) + .flatMap( + stream -> + stream.flatMap( + topic -> + IntStream.range(0, partitionsPerTopic) + .mapToObj(i -> new TopicPartition(topic, i)))) + .limit(numElements) + .collect( + Collectors.collectingAndThen( + Collectors.groupingBy( + Function.identity(), + Collectors.collectingAndThen( + Collectors.counting(), + c -> + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + c, timestampStartMillis, Optional.empty())))), + ListOffsetsResult::new))); + return mock; + } + // Update mock consumer with records distributed among the given topics, each with given number // of partitions. Records are assigned in round-robin order among the partitions. private static MockConsumer mkMockConsumer( @@ -338,8 +381,7 @@ public void run() { return consumer; } - private static class ConsumerFactoryFn - implements SerializableFunction, Consumer> { + private static class FactoryFns implements Serializable { private final List topics; private final int partitionsPerTopic; private final int numElements; @@ -347,7 +389,7 @@ private static class ConsumerFactoryFn private SerializableFunction keyFunction; private SerializableFunction valueFunction; - ConsumerFactoryFn( + FactoryFns( List topics, int partitionsPerTopic, int numElements, @@ -360,7 +402,7 @@ private static class ConsumerFactoryFn valueFunction = i -> ByteBuffer.wrap(new byte[8]).putLong(i).array(); } - ConsumerFactoryFn( + FactoryFns( List topics, int partitionsPerTopic, int numElements, @@ -375,8 +417,11 @@ private static class ConsumerFactoryFn this.valueFunction = valueFunction; } - @Override - public Consumer apply(Map config) { + public Admin createAdmin(Map config) { + return mkMockAdmin(topics, partitionsPerTopic, numElements, config); + } + + public Consumer createConsumer(Map config) { return mkMockConsumer( topics, partitionsPerTopic, @@ -462,19 +507,21 @@ static KafkaIO.Read mkKafkaReadTransform( @Nullable Boolean offsetDeduplication, @Nullable List topics, @Nullable Boolean redistributeByRecordKey) { + FactoryFns factoryFns = + new FactoryFns( + topics != null + ? topics.stream().distinct().collect(Collectors.toList()) + : mkKafkaTopics, + 10, + numElements, + OffsetResetStrategy.EARLIEST); // 20 partitions KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(mkKafkaServers) .withTopics(topics != null ? topics : mkKafkaTopics) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - topics != null - ? topics.stream().distinct().collect(Collectors.toList()) - : mkKafkaTopics, - 10, - numElements, - OffsetResetStrategy.EARLIEST)) // 20 partitions + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); if (maxNumRecords != null) { @@ -558,6 +605,15 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() { new AvroGeneratedUser("ValueName" + i, i, "color" + i))); } + FactoryFns factoryFns = + new FactoryFns( + ImmutableList.of(topic), + 1, + numElements, + OffsetResetStrategy.EARLIEST, + new KeyAvroSerializableFunction(topic, schemaRegistryUrl), + new ValueAvroSerializableFunction(topic, schemaRegistryUrl)); + KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("localhost:9092") @@ -566,14 +622,8 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() { mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject, null)) .withValueDeserializer( mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of(topic), - 1, - numElements, - OffsetResetStrategy.EARLIEST, - new KeyAvroSerializableFunction(topic, schemaRegistryUrl), - new ValueAvroSerializableFunction(topic, schemaRegistryUrl))) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withMaxNumRecords(numElements); PCollection> input = p.apply(reader.withoutMetadata()); @@ -603,6 +653,15 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { inputs.add(KV.of(i, new AvroGeneratedUser("ValueName" + i, i, "color" + i))); } + FactoryFns factoryFns = + new FactoryFns( + ImmutableList.of(topic), + 1, + numElements, + OffsetResetStrategy.EARLIEST, + i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(), + new ValueAvroSerializableFunction(topic, schemaRegistryUrl)); + KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("localhost:9092") @@ -610,14 +669,8 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer( mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of(topic), - 1, - numElements, - OffsetResetStrategy.EARLIEST, - i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(), - new ValueAvroSerializableFunction(topic, schemaRegistryUrl))) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withMaxNumRecords(numElements); PCollection> input = p.apply(reader.withoutMetadata()); @@ -666,14 +719,15 @@ public void testDeserializationWithHeaders() { // onwards int numElements = 1000; String topic = "my_topic"; + FactoryFns factoryFns = + new FactoryFns(ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopic("my_topic") - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withMaxNumRecords(numElements) .withKeyDeserializerAndCoder( KafkaIOTest.IntegerDeserializerWithHeadersAssertor.class, @@ -968,14 +1022,15 @@ public void testUnboundedSourceWithSingleTopic() { int numElements = 1000; String topic = "my_topic"; String bootStrapServer = "none"; + FactoryFns factoryFns = + new FactoryFns(ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(bootStrapServer) .withTopic("my_topic") - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); @@ -996,14 +1051,15 @@ public void testUnboundedSourceWithExplicitPartitions() { String topic = "test"; List topics = ImmutableList.of(topic); String bootStrapServer = "none"; + FactoryFns factoryFns = + new FactoryFns(topics, 10, numElements, OffsetResetStrategy.EARLIEST); // 10 partitions KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(bootStrapServer) .withTopicPartitions(ImmutableList.of(new TopicPartition(topic, 5))) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements / 10); @@ -1030,13 +1086,14 @@ public void testUnboundedSourceWithPattern() { "best", "gest", "hest", "jest", "lest", "nest", "pest", "rest", "test", "vest", "west", "zest"); String bootStrapServer = "none"; + FactoryFns factoryFns = new FactoryFns(topics, 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPattern("[a-z]est") - .withConsumerFactoryFn( - new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements); @@ -1059,13 +1116,14 @@ public void testUnboundedSourceWithPartiallyMatchedPattern() { List topics = ImmutableList.of("test", "Test"); String bootStrapServer = "none"; + FactoryFns factoryFns = new FactoryFns(topics, 1, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(bootStrapServer) .withTopicPattern("[a-z]est") - .withConsumerFactoryFn( - new ConsumerFactoryFn(topics, 1, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numMatchedElements); @@ -1099,13 +1157,14 @@ public void testUnboundedSourceWithUnmatchedPattern() { int numElements = 1000; List topics = ImmutableList.of("chest", "crest", "egest", "guest", "quest", "wrest"); + FactoryFns factoryFns = new FactoryFns(topics, 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPattern("[a-z]est") - .withConsumerFactoryFn( - new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements); @@ -1126,13 +1185,15 @@ public void testUnboundedSourceWithWrongTopic() { + "configuration and (?:make sure that provided topics exist|topic names).*")); int numElements = 1000; + FactoryFns factoryFns = + new FactoryFns(ImmutableList.of("my_topic"), 10, numElements, OffsetResetStrategy.EARLIEST); + KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopic("wrong_topic") // read from topic that doesn't exist - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of("my_topic"), 10, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); @@ -1317,14 +1378,15 @@ public void testUnboundedSourceWithExceptionInKafkaFetch() { int numElements = 1000; String topic = "my_topic"; + FactoryFns factoryFns = + new FactoryFns(ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopic("my_topic") - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withMaxNumRecords(2 * numElements) // Try to read more messages than available. .withConsumerConfigUpdates(ImmutableMap.of("inject.error.at.eof", true)) .withKeyDeserializer(IntegerDeserializer.class) @@ -1349,17 +1411,16 @@ public void testUnboundedSourceWithoutBoundedWrapper() { final int numElements = 1000; final int numPartitions = 10; String topic = "testUnboundedSourceWithoutBoundedWrapper"; + FactoryFns factoryFns = + new FactoryFns( + ImmutableList.of(topic), numPartitions, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(topic) .withTopic(topic) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - ImmutableList.of(topic), - numPartitions, - numElements, - OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withTimestampPolicyFactory( @@ -1528,13 +1589,14 @@ public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Except int numElements = 100; // all the 20 partitions will have elements List topics = ImmutableList.of("topic_a", "topic_b"); + FactoryFns factoryFns = new FactoryFns(topics, 10, numElements, OffsetResetStrategy.LATEST); source = KafkaIO.read() .withBootstrapServers("none") .withTopics(topics) - .withConsumerFactoryFn( - new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.LATEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements) @@ -1670,6 +1732,29 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { KafkaIO.read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) + .withAdminFactoryFn( + config -> { + Admin mock = Mockito.mock(Admin.class); + Mockito.when(mock.listOffsets(Mockito.anyMap())) + .thenAnswer( + invocation -> { + final Map topicPartitionOffsets = + invocation.getArgument(0); + + return KafkaFuture.completedFuture( + topicPartitionOffsets.entrySet().stream() + .collect( + Collectors.collectingAndThen( + Collectors.toMap( + Map.Entry::getKey, + entry -> + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + 0L, 0L, Optional.empty()))), + ListOffsetsResult::new))); + }); + return mock; + }) .withConsumerFactoryFn(endOffsetErrorConsumerFactory) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) @@ -2109,6 +2194,8 @@ public void testExactlyOnceSink() { String topic = "test-eos"; String bootStrapServer = "none"; + FactoryFns factoryFns = + new FactoryFns(Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST); p.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply( @@ -2118,9 +2205,7 @@ public void testExactlyOnceSink() { .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withEOS(1, "test-eos") - .withConsumerFactoryFn( - new ConsumerFactoryFn( - Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST)) + .withConsumerFactoryFn(factoryFns::createConsumer) .withPublishTimestampFunction((e, ts) -> ts) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); @@ -2148,6 +2233,8 @@ public void testExactlyOnceSinkWithSendException() throws Throwable { thrown.expectMessage("fakeException"); String topic = "test"; + FactoryFns factoryFns = + new FactoryFns(Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST); p.apply(Create.of(ImmutableList.of(KV.of(1, 1L), KV.of(2, 2L)))) .apply( @@ -2157,9 +2244,7 @@ public void testExactlyOnceSinkWithSendException() throws Throwable { .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withEOS(1, "testException") - .withConsumerFactoryFn( - new ConsumerFactoryFn( - Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST)) + .withConsumerFactoryFn(factoryFns::createConsumer) .withProducerFactoryFn(new SendErrorProducerFactory())); try { @@ -2323,18 +2408,19 @@ public void testUnboundedSourceRawSizeMetric() { final String readStep = "readFromKafka"; final int numElements = 1000; final int numPartitionsPerTopic = 10; - final int recordSize = 12; // The size of key and value is defined in ConsumerFactoryFn. + final int recordSize = 12; // The size of key and value is defined in FactoryFns. List topics = ImmutableList.of("test"); + FactoryFns factoryFns = + new FactoryFns(topics, numPartitionsPerTopic, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPartitions( ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 8))) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - topics, numPartitionsPerTopic, numElements, OffsetResetStrategy.EARLIEST)) + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements / numPartitionsPerTopic * 2); // 2 is the # of partitions @@ -2394,17 +2480,17 @@ public void testSourceDisplayData() { @Test public void testSourceWithExplicitPartitionsDisplayData() { + FactoryFns factoryFns = + new FactoryFns( + Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST); // 10 partitions + KafkaIO.Read read = KafkaIO.readBytes() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPartitions( ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))) - .withConsumerFactoryFn( - new ConsumerFactoryFn( - Lists.newArrayList("test"), - 10, - 10, - OffsetResetStrategy.EARLIEST)); // 10 partitions + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer); DisplayData displayData = DisplayData.from(read); @@ -2417,13 +2503,15 @@ public void testSourceWithExplicitPartitionsDisplayData() { @Test public void testSourceWithPatternDisplayData() { + FactoryFns factoryFns = + new FactoryFns(Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST); + KafkaIO.Read read = KafkaIO.readBytes() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPattern("[a-z]est") - .withConsumerFactoryFn( - new ConsumerFactoryFn( - Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)); + .withAdminFactoryFn(factoryFns::createAdmin) + .withConsumerFactoryFn(factoryFns::createConsumer); DisplayData displayData = DisplayData.from(read); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 5e3e08a60664..041fc1a0d76e 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.runners.core.metrics.DistributionCell; @@ -67,6 +68,9 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -75,6 +79,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -90,6 +95,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.Mockito; public class ReadFromKafkaDoFnTest { @@ -123,6 +129,29 @@ private ReadSourceDescriptors makeReadSourceDescriptor( return ReadSourceDescriptors.read() .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) + .withAdminFactoryFn( + config -> { + Admin mock = Mockito.mock(Admin.class); + Mockito.when(mock.listOffsets(Mockito.anyMap())) + .thenAnswer( + invocation -> { + final Map topicPartitionOffsets = + invocation.getArgument(0); + + return KafkaFuture.completedFuture( + topicPartitionOffsets.entrySet().stream() + .collect( + Collectors.collectingAndThen( + Collectors.toMap( + Map.Entry::getKey, + entry -> + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + Long.MAX_VALUE, 0L, Optional.empty()))), + ListOffsetsResult::new))); + }); + return mock; + }) .withConsumerFactoryFn( new SerializableFunction, Consumer>() { @Override @@ -138,6 +167,29 @@ private ReadSourceDescriptors makeFailingReadSourceDescriptor( return ReadSourceDescriptors.read() .withKeyDeserializer(FailingDeserializer.class) .withValueDeserializer(FailingDeserializer.class) + .withAdminFactoryFn( + config -> { + Admin mock = Mockito.mock(Admin.class); + Mockito.when(mock.listOffsets(Mockito.anyMap())) + .thenAnswer( + invocation -> { + final Map topicPartitionOffsets = + invocation.getArgument(0); + + return KafkaFuture.completedFuture( + topicPartitionOffsets.entrySet().stream() + .collect( + Collectors.collectingAndThen( + Collectors.toMap( + Map.Entry::getKey, + entry -> + KafkaFuture.completedFuture( + new ListOffsetsResult.ListOffsetsResultInfo( + Long.MAX_VALUE, 0L, Optional.empty()))), + ListOffsetsResult::new))); + }); + return mock; + }) .withConsumerFactoryFn( new SerializableFunction, Consumer>() { @Override diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 51d9b028bab0..01401377cdb4 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -57,6 +57,7 @@ import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; @@ -90,6 +91,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableStringField("topic_pattern") .addNullableByteArrayField("key_coder") .addNullableByteArrayField("value_coder") + .addByteArrayField("admin_factory_fn") .addByteArrayField("consumer_factory_fn") .addNullableByteArrayField("watermark_fn") .addInt64Field("max_num_records") @@ -166,6 +168,9 @@ public Row toConfigRow(Read transform) { if (transform.getValueCoder() != null) { fieldValues.put("value_coder", toByteArray(transform.getValueCoder())); } + if (transform.getAdminFactoryFn() != null) { + fieldValues.put("admin_factory_fn", toByteArray(transform.getAdminFactoryFn())); + } if (transform.getConsumerFactoryFn() != null) { fieldValues.put("consumer_factory_fn", toByteArray(transform.getConsumerFactoryFn())); } @@ -328,6 +333,13 @@ public Row toConfigRow(Read transform) { } } + byte[] adminFactoryFn = configRow.getBytes("admin_factory_fn"); + if (adminFactoryFn != null) { + transform = + transform.withAdminFactoryFn( + (SerializableFunction, Admin>) fromByteArray(adminFactoryFn)); + } + byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn"); if (consumerFactoryFn != null) { transform = diff --git a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java index 845e89b3b659..68c98ce44df8 100644 --- a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java +++ b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java @@ -58,6 +58,7 @@ public class KafkaIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put("getTopicPattern", "topic_pattern"); READ_TRANSFORM_SCHEMA_MAPPING.put("getKeyCoder", "key_coder"); READ_TRANSFORM_SCHEMA_MAPPING.put("getValueCoder", "value_coder"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getAdminFactoryFn", "admin_factory_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerFactoryFn", "consumer_factory_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getWatermarkFn", "watermark_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getMaxNumRecords", "max_num_records"); From fd9ca4008e505c0cdc69e78d7ab841bc2b17b611 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Mon, 17 Nov 2025 11:36:53 +0000 Subject: [PATCH 2/7] Apply offset consumer config overrides to admin client config --- .../apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index bea240f31c25..5b974dae5989 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -24,6 +24,7 @@ import java.math.MathContext; import java.time.Duration; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -210,6 +211,15 @@ private ReadFromKafkaDoFn( transform.getAdminFactoryFn(); final SerializableFunction, Consumer> consumerFactoryFn = transform.getConsumerFactoryFn(); + final @Nullable Map offsetConsumerConfigOverrides = + transform.getOffsetConsumerConfig(); + final Map offsetConsumerConfig; + if (offsetConsumerConfigOverrides == null) { + offsetConsumerConfig = transform.getConsumerConfig(); + } else { + offsetConsumerConfig = new HashMap<>(transform.getConsumerConfig()); + offsetConsumerConfig.putAll(offsetConsumerConfigOverrides); + } this.consumerConfig = transform.getConsumerConfig(); this.keyDeserializerProvider = Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider()); @@ -260,7 +270,7 @@ public KafkaLatestOffsetEstimator load( sourceDescriptor); final Map config = KafkaIOUtils.overrideBootstrapServersConfig( - consumerConfig, sourceDescriptor); + offsetConsumerConfig, sourceDescriptor); final Admin admin = adminFactoryFn.apply(config); return new KafkaLatestOffsetEstimator( admin, sourceDescriptor.getTopicPartition()); From 5b14dfdecbbdafcb9d68f81749b91f3be3902554 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 27 Nov 2025 19:22:14 +0000 Subject: [PATCH 3/7] Revert "Apply offset consumer config overrides to admin client config" This reverts commit 5f4bcbe9fa3b113f4603d9421a61370f677f6ec6. --- .../apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 5b974dae5989..bea240f31c25 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -24,7 +24,6 @@ import java.math.MathContext; import java.time.Duration; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -211,15 +210,6 @@ private ReadFromKafkaDoFn( transform.getAdminFactoryFn(); final SerializableFunction, Consumer> consumerFactoryFn = transform.getConsumerFactoryFn(); - final @Nullable Map offsetConsumerConfigOverrides = - transform.getOffsetConsumerConfig(); - final Map offsetConsumerConfig; - if (offsetConsumerConfigOverrides == null) { - offsetConsumerConfig = transform.getConsumerConfig(); - } else { - offsetConsumerConfig = new HashMap<>(transform.getConsumerConfig()); - offsetConsumerConfig.putAll(offsetConsumerConfigOverrides); - } this.consumerConfig = transform.getConsumerConfig(); this.keyDeserializerProvider = Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider()); @@ -270,7 +260,7 @@ public KafkaLatestOffsetEstimator load( sourceDescriptor); final Map config = KafkaIOUtils.overrideBootstrapServersConfig( - offsetConsumerConfig, sourceDescriptor); + consumerConfig, sourceDescriptor); final Admin admin = adminFactoryFn.apply(config); return new KafkaLatestOffsetEstimator( admin, sourceDescriptor.getTopicPartition()); From b6525443aa20fbf8f7c1ad237b9aceac1d462514 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 27 Nov 2025 19:24:32 +0000 Subject: [PATCH 4/7] Partially revert "Replace end offset consumer with admin client" This reverts commit 0d06b5e88bed4d799714e3e323a626bdc10a4eff. --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 35 --- ...afkaIOReadImplementationCompatibility.java | 1 - .../beam/sdk/io/kafka/KafkaIOUtils.java | 5 - .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 59 ++--- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 244 ++++++------------ .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 52 ---- .../io/kafka/upgrade/KafkaIOTranslation.java | 12 - .../kafka/upgrade/KafkaIOTranslationTest.java | 1 - 8 files changed, 102 insertions(+), 307 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index eb11d5b0c025..ad5535517646 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -114,7 +114,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -605,7 +604,6 @@ public static Read read() { return new AutoValue_KafkaIO_Read.Builder() .setTopics(new ArrayList<>()) .setTopicPartitions(new ArrayList<>()) - .setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) @@ -697,9 +695,6 @@ public abstract static class Read @Pure public abstract @Nullable Coder getValueCoder(); - @Pure - public abstract SerializableFunction, Admin> getAdminFactoryFn(); - @Pure public abstract SerializableFunction, Consumer> getConsumerFactoryFn(); @@ -783,9 +778,6 @@ abstract static class Builder { abstract Builder setValueCoder(Coder valueCoder); - abstract Builder setAdminFactoryFn( - SerializableFunction, Admin> adminFactoryFn); - abstract Builder setConsumerFactoryFn( SerializableFunction, Consumer> consumerFactoryFn); @@ -869,7 +861,6 @@ static void setupExternalBuilder( // Set required defaults builder.setTopicPartitions(Collections.emptyList()); - builder.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN); builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN); if (config.maxReadTime != null) { builder.setMaxReadTime(Duration.standardSeconds(config.maxReadTime)); @@ -1315,15 +1306,6 @@ public Read withValueDeserializerProviderAndCoder( .build(); } - /** - * A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful - * for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}. - */ - public Read withAdminFactoryFn( - SerializableFunction, Admin> adminFactoryFn) { - return toBuilder().setAdminFactoryFn(adminFactoryFn).build(); - } - /** * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}. @@ -1981,7 +1963,6 @@ public PCollection> expand(PBegin input) { ReadSourceDescriptors.read() .withConsumerConfigOverrides(kafkaRead.getConsumerConfig()) .withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig()) - .withAdminFactoryFn(kafkaRead.getAdminFactoryFn()) .withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn()) .withKeyDeserializerProviderAndCoder( kafkaRead.getKeyDeserializerProvider(), keyCoder) @@ -2498,9 +2479,6 @@ public abstract static class ReadSourceDescriptors @Pure abstract @Nullable Coder getValueCoder(); - @Pure - abstract SerializableFunction, Admin> getAdminFactoryFn(); - @Pure abstract SerializableFunction, Consumer> getConsumerFactoryFn(); @@ -2551,9 +2529,6 @@ abstract static class Builder { abstract ReadSourceDescriptors.Builder setOffsetConsumerConfig( @Nullable Map offsetConsumerConfig); - abstract ReadSourceDescriptors.Builder setAdminFactoryFn( - SerializableFunction, Admin> adminFactoryFn); - abstract ReadSourceDescriptors.Builder setConsumerFactoryFn( SerializableFunction, Consumer> consumerFactoryFn); @@ -2608,7 +2583,6 @@ abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( public static ReadSourceDescriptors read() { return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder() - .setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setCommitOffsetEnabled(false) @@ -2709,15 +2683,6 @@ public ReadSourceDescriptors withValueDeserializerProviderAndCoder( .build(); } - /** - * A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful - * for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}. - */ - public ReadSourceDescriptors withAdminFactoryFn( - SerializableFunction, Admin> adminFactoryFn) { - return toBuilder().setAdminFactoryFn(adminFactoryFn).build(); - } - /** * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 4ae6651d4e0c..8c5efb066d6e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -83,7 +83,6 @@ enum KafkaIOReadProperties { TOPIC_PATTERN, KEY_CODER, VALUE_CODER, - ADMIN_FACTORY_FN, CONSUMER_FACTORY_FN, WATERMARK_FN(LEGACY), MAX_NUM_RECORDS(LEGACY) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java index 24dde09400eb..91aa85577959 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -77,10 +76,6 @@ public final class KafkaIOUtils { // lets allow these, applications can have better resume point for restarts. ); - // Default Kafka Admin supplier. - static final SerializableFunction, Admin> KAFKA_ADMIN_FACTORY_FN = - Admin::create; - // default Kafka 0.9 Consumer supplier. static final SerializableFunction, Consumer> KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index bea240f31c25..a05abba06e75 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -67,14 +67,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; @@ -206,8 +202,6 @@ private static class Bounded extends ReadFromKafkaDoFn { private ReadFromKafkaDoFn( ReadSourceDescriptors transform, TupleTag>> recordTag) { - final SerializableFunction, Admin> adminFactoryFn = - transform.getAdminFactoryFn(); final SerializableFunction, Consumer> consumerFactoryFn = transform.getConsumerFactoryFn(); this.consumerConfig = transform.getConsumerConfig(); @@ -256,14 +250,15 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) public KafkaLatestOffsetEstimator load( final KafkaSourceDescriptor sourceDescriptor) { LOG.info( - "Creating Kafka admin for offset estimation for {}", + "Creating Kafka consumer for offset estimation for {}", sourceDescriptor); final Map config = KafkaIOUtils.overrideBootstrapServersConfig( consumerConfig, sourceDescriptor); - final Admin admin = adminFactoryFn.apply(config); + final Consumer consumer = + consumerFactoryFn.apply(config); return new KafkaLatestOffsetEstimator( - admin, sourceDescriptor.getTopicPartition()); + consumer, sourceDescriptor.getTopicPartition()); } })); this.pollConsumerCacheSupplier = @@ -360,43 +355,37 @@ public Consumer load( */ private static class KafkaLatestOffsetEstimator implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable { - private static final ListOffsetsResult.ListOffsetsResultInfo DEFAULT_RESULT = - new ListOffsetsResult.ListOffsetsResultInfo( - Long.MIN_VALUE, Long.MIN_VALUE, Optional.empty()); - - private final Admin admin; - private final Supplier> - latestOffsetFutureSupplier; - private ListOffsetsResult.ListOffsetsResultInfo latestOffsetResult; - - KafkaLatestOffsetEstimator(final Admin admin, final TopicPartition topicPartition) { - this.admin = admin; - this.latestOffsetFutureSupplier = + private final Consumer offsetConsumer; + private final Supplier offsetSupplier; + + KafkaLatestOffsetEstimator( + final Consumer offsetConsumer, final TopicPartition topicPartition) { + this.offsetConsumer = offsetConsumer; + this.offsetSupplier = new ExpiringMemoizingSerializableSupplier<>( - () -> - admin - .listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest())) - .partitionResult(topicPartition), + () -> { + try { + return offsetConsumer + .endOffsets(Collections.singleton(topicPartition)) + .getOrDefault(topicPartition, Long.MIN_VALUE); + } catch (Throwable t) { + LOG.error("Failed to get end offset for {}", topicPartition, t); + return Long.MIN_VALUE; + } + }, Duration.ofSeconds(1), - KafkaFuture.completedFuture(DEFAULT_RESULT), + Long.MIN_VALUE, Duration.ZERO); - this.latestOffsetResult = DEFAULT_RESULT; } @Override public long estimate() { - try { - latestOffsetResult = latestOffsetFutureSupplier.get().getNow(latestOffsetResult); - } catch (Throwable t) { - LOG.error("Failed to get latest offset", t); - } - - return latestOffsetResult.offset(); + return offsetSupplier.get(); } @Override public void close() { - admin.close(); + offsetConsumer.close(); } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index c29fdffcd2cb..703d323090dd 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -43,7 +43,6 @@ import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; -import java.io.Serializable; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -66,10 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.Stream; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; @@ -128,9 +124,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -144,7 +137,6 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -173,9 +165,6 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,8 +186,6 @@ public class KafkaIOTest { * - test KafkaRecordCoder */ - @Rule public final MockitoRule m = MockitoJUnit.rule(); - @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -214,36 +201,6 @@ public class KafkaIOTest { static List mkKafkaTopics = ImmutableList.of("topic_a", "topic_b"); static String mkKafkaServers = "myServer1:9092,myServer2:9092"; - private static Admin mkMockAdmin( - List topics, int partitionsPerTopic, int numElements, Map config) { - final long timestampStartMillis = - (Long) - config.getOrDefault(TIMESTAMP_START_MILLIS_CONFIG, LOG_APPEND_START_TIME.getMillis()); - final Admin mock = Mockito.mock(Admin.class); - Mockito.when(mock.listOffsets(Mockito.anyMap())) - .thenReturn( - Stream.generate(topics::stream) - .flatMap( - stream -> - stream.flatMap( - topic -> - IntStream.range(0, partitionsPerTopic) - .mapToObj(i -> new TopicPartition(topic, i)))) - .limit(numElements) - .collect( - Collectors.collectingAndThen( - Collectors.groupingBy( - Function.identity(), - Collectors.collectingAndThen( - Collectors.counting(), - c -> - KafkaFuture.completedFuture( - new ListOffsetsResult.ListOffsetsResultInfo( - c, timestampStartMillis, Optional.empty())))), - ListOffsetsResult::new))); - return mock; - } - // Update mock consumer with records distributed among the given topics, each with given number // of partitions. Records are assigned in round-robin order among the partitions. private static MockConsumer mkMockConsumer( @@ -381,7 +338,8 @@ public void run() { return consumer; } - private static class FactoryFns implements Serializable { + private static class ConsumerFactoryFn + implements SerializableFunction, Consumer> { private final List topics; private final int partitionsPerTopic; private final int numElements; @@ -389,7 +347,7 @@ private static class FactoryFns implements Serializable { private SerializableFunction keyFunction; private SerializableFunction valueFunction; - FactoryFns( + ConsumerFactoryFn( List topics, int partitionsPerTopic, int numElements, @@ -402,7 +360,7 @@ private static class FactoryFns implements Serializable { valueFunction = i -> ByteBuffer.wrap(new byte[8]).putLong(i).array(); } - FactoryFns( + ConsumerFactoryFn( List topics, int partitionsPerTopic, int numElements, @@ -417,11 +375,8 @@ private static class FactoryFns implements Serializable { this.valueFunction = valueFunction; } - public Admin createAdmin(Map config) { - return mkMockAdmin(topics, partitionsPerTopic, numElements, config); - } - - public Consumer createConsumer(Map config) { + @Override + public Consumer apply(Map config) { return mkMockConsumer( topics, partitionsPerTopic, @@ -507,21 +462,19 @@ static KafkaIO.Read mkKafkaReadTransform( @Nullable Boolean offsetDeduplication, @Nullable List topics, @Nullable Boolean redistributeByRecordKey) { - FactoryFns factoryFns = - new FactoryFns( - topics != null - ? topics.stream().distinct().collect(Collectors.toList()) - : mkKafkaTopics, - 10, - numElements, - OffsetResetStrategy.EARLIEST); // 20 partitions KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(mkKafkaServers) .withTopics(topics != null ? topics : mkKafkaTopics) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + topics != null + ? topics.stream().distinct().collect(Collectors.toList()) + : mkKafkaTopics, + 10, + numElements, + OffsetResetStrategy.EARLIEST)) // 20 partitions .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); if (maxNumRecords != null) { @@ -605,15 +558,6 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() { new AvroGeneratedUser("ValueName" + i, i, "color" + i))); } - FactoryFns factoryFns = - new FactoryFns( - ImmutableList.of(topic), - 1, - numElements, - OffsetResetStrategy.EARLIEST, - new KeyAvroSerializableFunction(topic, schemaRegistryUrl), - new ValueAvroSerializableFunction(topic, schemaRegistryUrl)); - KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("localhost:9092") @@ -622,8 +566,14 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() { mockDeserializerProvider(schemaRegistryUrl, keySchemaSubject, null)) .withValueDeserializer( mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of(topic), + 1, + numElements, + OffsetResetStrategy.EARLIEST, + new KeyAvroSerializableFunction(topic, schemaRegistryUrl), + new ValueAvroSerializableFunction(topic, schemaRegistryUrl))) .withMaxNumRecords(numElements); PCollection> input = p.apply(reader.withoutMetadata()); @@ -653,15 +603,6 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { inputs.add(KV.of(i, new AvroGeneratedUser("ValueName" + i, i, "color" + i))); } - FactoryFns factoryFns = - new FactoryFns( - ImmutableList.of(topic), - 1, - numElements, - OffsetResetStrategy.EARLIEST, - i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(), - new ValueAvroSerializableFunction(topic, schemaRegistryUrl)); - KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("localhost:9092") @@ -669,8 +610,14 @@ public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer( mockDeserializerProvider(schemaRegistryUrl, valueSchemaSubject, null)) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of(topic), + 1, + numElements, + OffsetResetStrategy.EARLIEST, + i -> ByteBuffer.wrap(new byte[4]).putInt(i).array(), + new ValueAvroSerializableFunction(topic, schemaRegistryUrl))) .withMaxNumRecords(numElements); PCollection> input = p.apply(reader.withoutMetadata()); @@ -719,15 +666,14 @@ public void testDeserializationWithHeaders() { // onwards int numElements = 1000; String topic = "my_topic"; - FactoryFns factoryFns = - new FactoryFns(ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopic("my_topic") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(numElements) .withKeyDeserializerAndCoder( KafkaIOTest.IntegerDeserializerWithHeadersAssertor.class, @@ -1022,15 +968,14 @@ public void testUnboundedSourceWithSingleTopic() { int numElements = 1000; String topic = "my_topic"; String bootStrapServer = "none"; - FactoryFns factoryFns = - new FactoryFns(ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(bootStrapServer) .withTopic("my_topic") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); @@ -1051,15 +996,14 @@ public void testUnboundedSourceWithExplicitPartitions() { String topic = "test"; List topics = ImmutableList.of(topic); String bootStrapServer = "none"; - FactoryFns factoryFns = - new FactoryFns(topics, 10, numElements, OffsetResetStrategy.EARLIEST); // 10 partitions KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(bootStrapServer) .withTopicPartitions(ImmutableList.of(new TopicPartition(topic, 5))) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements / 10); @@ -1086,14 +1030,13 @@ public void testUnboundedSourceWithPattern() { "best", "gest", "hest", "jest", "lest", "nest", "pest", "rest", "test", "vest", "west", "zest"); String bootStrapServer = "none"; - FactoryFns factoryFns = new FactoryFns(topics, 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPattern("[a-z]est") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.EARLIEST)) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements); @@ -1116,14 +1059,13 @@ public void testUnboundedSourceWithPartiallyMatchedPattern() { List topics = ImmutableList.of("test", "Test"); String bootStrapServer = "none"; - FactoryFns factoryFns = new FactoryFns(topics, 1, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(bootStrapServer) .withTopicPattern("[a-z]est") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 1, numElements, OffsetResetStrategy.EARLIEST)) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numMatchedElements); @@ -1157,14 +1099,13 @@ public void testUnboundedSourceWithUnmatchedPattern() { int numElements = 1000; List topics = ImmutableList.of("chest", "crest", "egest", "guest", "quest", "wrest"); - FactoryFns factoryFns = new FactoryFns(topics, 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPattern("[a-z]est") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.EARLIEST)) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements); @@ -1185,15 +1126,13 @@ public void testUnboundedSourceWithWrongTopic() { + "configuration and (?:make sure that provided topics exist|topic names).*")); int numElements = 1000; - FactoryFns factoryFns = - new FactoryFns(ImmutableList.of("my_topic"), 10, numElements, OffsetResetStrategy.EARLIEST); - KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopic("wrong_topic") // read from topic that doesn't exist - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of("my_topic"), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(numElements) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class); @@ -1378,15 +1317,14 @@ public void testUnboundedSourceWithExceptionInKafkaFetch() { int numElements = 1000; String topic = "my_topic"; - FactoryFns factoryFns = - new FactoryFns(ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopic("my_topic") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST)) .withMaxNumRecords(2 * numElements) // Try to read more messages than available. .withConsumerConfigUpdates(ImmutableMap.of("inject.error.at.eof", true)) .withKeyDeserializer(IntegerDeserializer.class) @@ -1411,16 +1349,17 @@ public void testUnboundedSourceWithoutBoundedWrapper() { final int numElements = 1000; final int numPartitions = 10; String topic = "testUnboundedSourceWithoutBoundedWrapper"; - FactoryFns factoryFns = - new FactoryFns( - ImmutableList.of(topic), numPartitions, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers(topic) .withTopic(topic) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + ImmutableList.of(topic), + numPartitions, + numElements, + OffsetResetStrategy.EARLIEST)) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withTimestampPolicyFactory( @@ -1589,14 +1528,13 @@ public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Except int numElements = 100; // all the 20 partitions will have elements List topics = ImmutableList.of("topic_a", "topic_b"); - FactoryFns factoryFns = new FactoryFns(topics, 10, numElements, OffsetResetStrategy.LATEST); source = KafkaIO.read() .withBootstrapServers("none") .withTopics(topics) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn(topics, 10, numElements, OffsetResetStrategy.LATEST)) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements) @@ -1732,29 +1670,6 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { KafkaIO.read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) - .withAdminFactoryFn( - config -> { - Admin mock = Mockito.mock(Admin.class); - Mockito.when(mock.listOffsets(Mockito.anyMap())) - .thenAnswer( - invocation -> { - final Map topicPartitionOffsets = - invocation.getArgument(0); - - return KafkaFuture.completedFuture( - topicPartitionOffsets.entrySet().stream() - .collect( - Collectors.collectingAndThen( - Collectors.toMap( - Map.Entry::getKey, - entry -> - KafkaFuture.completedFuture( - new ListOffsetsResult.ListOffsetsResultInfo( - 0L, 0L, Optional.empty()))), - ListOffsetsResult::new))); - }); - return mock; - }) .withConsumerFactoryFn(endOffsetErrorConsumerFactory) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) @@ -2194,8 +2109,6 @@ public void testExactlyOnceSink() { String topic = "test-eos"; String bootStrapServer = "none"; - FactoryFns factoryFns = - new FactoryFns(Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST); p.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply( @@ -2205,7 +2118,9 @@ public void testExactlyOnceSink() { .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withEOS(1, "test-eos") - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST)) .withPublishTimestampFunction((e, ts) -> ts) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); @@ -2233,8 +2148,6 @@ public void testExactlyOnceSinkWithSendException() throws Throwable { thrown.expectMessage("fakeException"); String topic = "test"; - FactoryFns factoryFns = - new FactoryFns(Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST); p.apply(Create.of(ImmutableList.of(KV.of(1, 1L), KV.of(2, 2L)))) .apply( @@ -2244,7 +2157,9 @@ public void testExactlyOnceSinkWithSendException() throws Throwable { .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) .withEOS(1, "testException") - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST)) .withProducerFactoryFn(new SendErrorProducerFactory())); try { @@ -2408,19 +2323,18 @@ public void testUnboundedSourceRawSizeMetric() { final String readStep = "readFromKafka"; final int numElements = 1000; final int numPartitionsPerTopic = 10; - final int recordSize = 12; // The size of key and value is defined in FactoryFns. + final int recordSize = 12; // The size of key and value is defined in ConsumerFactoryFn. List topics = ImmutableList.of("test"); - FactoryFns factoryFns = - new FactoryFns(topics, numPartitionsPerTopic, numElements, OffsetResetStrategy.EARLIEST); KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPartitions( ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 8))) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer) + .withConsumerFactoryFn( + new ConsumerFactoryFn( + topics, numPartitionsPerTopic, numElements, OffsetResetStrategy.EARLIEST)) .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(LongDeserializer.class) .withMaxNumRecords(numElements / numPartitionsPerTopic * 2); // 2 is the # of partitions @@ -2480,17 +2394,17 @@ public void testSourceDisplayData() { @Test public void testSourceWithExplicitPartitionsDisplayData() { - FactoryFns factoryFns = - new FactoryFns( - Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST); // 10 partitions - KafkaIO.Read read = KafkaIO.readBytes() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPartitions( ImmutableList.of(new TopicPartition("test", 5), new TopicPartition("test", 6))) - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer); + .withConsumerFactoryFn( + new ConsumerFactoryFn( + Lists.newArrayList("test"), + 10, + 10, + OffsetResetStrategy.EARLIEST)); // 10 partitions DisplayData displayData = DisplayData.from(read); @@ -2503,15 +2417,13 @@ public void testSourceWithExplicitPartitionsDisplayData() { @Test public void testSourceWithPatternDisplayData() { - FactoryFns factoryFns = - new FactoryFns(Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST); - KafkaIO.Read read = KafkaIO.readBytes() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopicPattern("[a-z]est") - .withAdminFactoryFn(factoryFns::createAdmin) - .withConsumerFactoryFn(factoryFns::createConsumer); + .withConsumerFactoryFn( + new ConsumerFactoryFn( + Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)); DisplayData displayData = DisplayData.from(read); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 041fc1a0d76e..5e3e08a60664 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -28,7 +28,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.beam.runners.core.metrics.DistributionCell; @@ -68,9 +67,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -79,7 +75,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.SerializationException; @@ -95,7 +90,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import org.mockito.Mockito; public class ReadFromKafkaDoFnTest { @@ -129,29 +123,6 @@ private ReadSourceDescriptors makeReadSourceDescriptor( return ReadSourceDescriptors.read() .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) - .withAdminFactoryFn( - config -> { - Admin mock = Mockito.mock(Admin.class); - Mockito.when(mock.listOffsets(Mockito.anyMap())) - .thenAnswer( - invocation -> { - final Map topicPartitionOffsets = - invocation.getArgument(0); - - return KafkaFuture.completedFuture( - topicPartitionOffsets.entrySet().stream() - .collect( - Collectors.collectingAndThen( - Collectors.toMap( - Map.Entry::getKey, - entry -> - KafkaFuture.completedFuture( - new ListOffsetsResult.ListOffsetsResultInfo( - Long.MAX_VALUE, 0L, Optional.empty()))), - ListOffsetsResult::new))); - }); - return mock; - }) .withConsumerFactoryFn( new SerializableFunction, Consumer>() { @Override @@ -167,29 +138,6 @@ private ReadSourceDescriptors makeFailingReadSourceDescriptor( return ReadSourceDescriptors.read() .withKeyDeserializer(FailingDeserializer.class) .withValueDeserializer(FailingDeserializer.class) - .withAdminFactoryFn( - config -> { - Admin mock = Mockito.mock(Admin.class); - Mockito.when(mock.listOffsets(Mockito.anyMap())) - .thenAnswer( - invocation -> { - final Map topicPartitionOffsets = - invocation.getArgument(0); - - return KafkaFuture.completedFuture( - topicPartitionOffsets.entrySet().stream() - .collect( - Collectors.collectingAndThen( - Collectors.toMap( - Map.Entry::getKey, - entry -> - KafkaFuture.completedFuture( - new ListOffsetsResult.ListOffsetsResultInfo( - Long.MAX_VALUE, 0L, Optional.empty()))), - ListOffsetsResult::new))); - }); - return mock; - }) .withConsumerFactoryFn( new SerializableFunction, Consumer>() { @Override diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 01401377cdb4..51d9b028bab0 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -57,7 +57,6 @@ import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; @@ -91,7 +90,6 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableStringField("topic_pattern") .addNullableByteArrayField("key_coder") .addNullableByteArrayField("value_coder") - .addByteArrayField("admin_factory_fn") .addByteArrayField("consumer_factory_fn") .addNullableByteArrayField("watermark_fn") .addInt64Field("max_num_records") @@ -168,9 +166,6 @@ public Row toConfigRow(Read transform) { if (transform.getValueCoder() != null) { fieldValues.put("value_coder", toByteArray(transform.getValueCoder())); } - if (transform.getAdminFactoryFn() != null) { - fieldValues.put("admin_factory_fn", toByteArray(transform.getAdminFactoryFn())); - } if (transform.getConsumerFactoryFn() != null) { fieldValues.put("consumer_factory_fn", toByteArray(transform.getConsumerFactoryFn())); } @@ -333,13 +328,6 @@ public Row toConfigRow(Read transform) { } } - byte[] adminFactoryFn = configRow.getBytes("admin_factory_fn"); - if (adminFactoryFn != null) { - transform = - transform.withAdminFactoryFn( - (SerializableFunction, Admin>) fromByteArray(adminFactoryFn)); - } - byte[] consumerFactoryFn = configRow.getBytes("consumer_factory_fn"); if (consumerFactoryFn != null) { transform = diff --git a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java index 68c98ce44df8..845e89b3b659 100644 --- a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java +++ b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java @@ -58,7 +58,6 @@ public class KafkaIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put("getTopicPattern", "topic_pattern"); READ_TRANSFORM_SCHEMA_MAPPING.put("getKeyCoder", "key_coder"); READ_TRANSFORM_SCHEMA_MAPPING.put("getValueCoder", "value_coder"); - READ_TRANSFORM_SCHEMA_MAPPING.put("getAdminFactoryFn", "admin_factory_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerFactoryFn", "consumer_factory_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getWatermarkFn", "watermark_fn"); READ_TRANSFORM_SCHEMA_MAPPING.put("getMaxNumRecords", "max_num_records"); From a14a3f304ed0e865ebfce1677a03caa39d83f518 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 27 Nov 2025 23:19:44 +0000 Subject: [PATCH 5/7] Use currentLag and position to set end offset estimate --- .../beam/gradle/BeamModulePlugin.groovy | 4 +- sdks/java/io/kafka/build.gradle | 4 +- sdks/java/io/kafka/kafka-312/build.gradle | 24 ---- sdks/java/io/kafka/kafka-390/build.gradle | 24 ---- .../{kafka-282 => kafka-391}/build.gradle | 4 +- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 107 +++++------------- .../sdk/io/kafka/KafkaCommitOffsetTest.java | 4 +- settings.gradle.kts | 16 +-- 8 files changed, 36 insertions(+), 151 deletions(-) delete mode 100644 sdks/java/io/kafka/kafka-312/build.gradle delete mode 100644 sdks/java/io/kafka/kafka-390/build.gradle rename sdks/java/io/kafka/{kafka-282 => kafka-391}/build.gradle (95%) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7c8be7a4c8fa..1bb030ce5cfe 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -629,7 +629,7 @@ class BeamModulePlugin implements Plugin { def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" def everit_json_version = "1.14.2" - def kafka_version = "2.8.2" + def kafka_version = "3.9.1" def log4j2_version = "2.20.0" def nemo_version = "0.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom @@ -836,7 +836,7 @@ class BeamModulePlugin implements Plugin { jupiter_api : "org.junit.jupiter:junit-jupiter-api:$jupiter_version", jupiter_engine : "org.junit.jupiter:junit-jupiter-engine:$jupiter_version", jupiter_params : "org.junit.jupiter:junit-jupiter-params:$jupiter_version", - kafka : "org.apache.kafka:kafka_2.11:$kafka_version", + kafka : "org.apache.kafka:kafka_2.12:$kafka_version", kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version", log4j : "log4j:log4j:1.2.17", log4j_over_slf4j : "org.slf4j:log4j-over-slf4j:$slf4j_version", diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 1b5e8f19120a..0e7bf43a8c9c 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -36,9 +36,7 @@ ext { } def kafkaVersions = [ - '282': "2.8.2", - '312': "3.1.2", - '390': "3.9.0", + '391': "3.9.1", ] kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")} diff --git a/sdks/java/io/kafka/kafka-312/build.gradle b/sdks/java/io/kafka/kafka-312/build.gradle deleted file mode 100644 index af2ad3717b65..000000000000 --- a/sdks/java/io/kafka/kafka-312/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="3.1.2" - undelimited="312" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-390/build.gradle b/sdks/java/io/kafka/kafka-390/build.gradle deleted file mode 100644 index 8c8821386265..000000000000 --- a/sdks/java/io/kafka/kafka-390/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="3.9.0" - undelimited="390" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-282/build.gradle b/sdks/java/io/kafka/kafka-391/build.gradle similarity index 95% rename from sdks/java/io/kafka/kafka-282/build.gradle rename to sdks/java/io/kafka/kafka-391/build.gradle index b754d93077e5..d2f96debc782 100644 --- a/sdks/java/io/kafka/kafka-282/build.gradle +++ b/sdks/java/io/kafka/kafka-391/build.gradle @@ -16,8 +16,8 @@ * limitations under the License. */ project.ext { - delimited="2.8.2" - undelimited="282" + delimited="3.9.1" + undelimited="391" sdfCompatible=true } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a05abba06e75..341c15ba8f59 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.io.Closeable; import java.math.BigDecimal; import java.math.MathContext; import java.time.Duration; @@ -27,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -49,7 +48,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExpiringMemoizingSerializableSupplier; import org.apache.beam.sdk.util.MemoizingPerInstantiationSerializableSupplier; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.SerializableSupplier; @@ -71,6 +69,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; @@ -235,30 +234,12 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) CacheBuilder.newBuilder() .concurrencyLevel(Runtime.getRuntime().availableProcessors()) .weakValues() - .removalListener( - (RemovalNotification - notification) -> { - final @Nullable KafkaLatestOffsetEstimator value; - if (notification.getCause() == RemovalCause.COLLECTED - && (value = notification.getValue()) != null) { - value.close(); - } - }) .build( - new CacheLoader() { + new CacheLoader() { @Override - public KafkaLatestOffsetEstimator load( - final KafkaSourceDescriptor sourceDescriptor) { - LOG.info( - "Creating Kafka consumer for offset estimation for {}", - sourceDescriptor); - final Map config = - KafkaIOUtils.overrideBootstrapServersConfig( - consumerConfig, sourceDescriptor); - final Consumer consumer = - consumerFactoryFn.apply(config); - return new KafkaLatestOffsetEstimator( - consumer, sourceDescriptor.getTopicPartition()); + public AtomicLong load(final KafkaSourceDescriptor sourceDescriptor) { + LOG.info("Creating end offset estimator for {}", sourceDescriptor); + return new AtomicLong(Long.MIN_VALUE); } })); this.pollConsumerCacheSupplier = @@ -319,8 +300,7 @@ public Consumer load( private final SerializableSupplier> avgRecordSizeCacheSupplier; - private final SerializableSupplier< - LoadingCache> + private final SerializableSupplier> latestOffsetEstimatorCacheSupplier; private final SerializableSupplier>> @@ -329,8 +309,7 @@ public Consumer load( private transient @MonotonicNonNull LoadingCache avgRecordSizeCache; - private transient @MonotonicNonNull LoadingCache< - KafkaSourceDescriptor, KafkaLatestOffsetEstimator> + private transient @MonotonicNonNull LoadingCache latestOffsetEstimatorCache; private transient @MonotonicNonNull LoadingCache> @@ -349,46 +328,6 @@ public Consumer load( @VisibleForTesting static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX; - /** - * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to - * fetch backlog. - */ - private static class KafkaLatestOffsetEstimator - implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable { - private final Consumer offsetConsumer; - private final Supplier offsetSupplier; - - KafkaLatestOffsetEstimator( - final Consumer offsetConsumer, final TopicPartition topicPartition) { - this.offsetConsumer = offsetConsumer; - this.offsetSupplier = - new ExpiringMemoizingSerializableSupplier<>( - () -> { - try { - return offsetConsumer - .endOffsets(Collections.singleton(topicPartition)) - .getOrDefault(topicPartition, Long.MIN_VALUE); - } catch (Throwable t) { - LOG.error("Failed to get end offset for {}", topicPartition, t); - return Long.MIN_VALUE; - } - }, - Duration.ofSeconds(1), - Long.MIN_VALUE, - Duration.ZERO); - } - - @Override - public long estimate() { - return offsetSupplier.get(); - } - - @Override - public void close() { - offsetConsumer.close(); - } - } - @GetInitialRestriction @RequiresNonNull({"pollConsumerCache"}) public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) { @@ -500,8 +439,8 @@ public double getSize( @RequiresNonNull({"latestOffsetEstimatorCache"}) public UnsplittableRestrictionTracker restrictionTracker( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) { - final LoadingCache - latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache latestOffsetEstimatorCache = + this.latestOffsetEstimatorCache; if (restriction.getTo() < Long.MAX_VALUE) { return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction)); @@ -510,9 +449,10 @@ public UnsplittableRestrictionTracker restrictionTracker( // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, // so we want to minimize the amount of connections that we start and track with Kafka. Another // point is that it has a memoized backlog, and this should make that more reusable estimations. + final AtomicLong latestOffsetEstimator = + latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor); return new UnsplittableRestrictionTracker<>( - new GrowableOffsetRangeTracker( - restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor))); + new GrowableOffsetRangeTracker(restriction.getFrom(), latestOffsetEstimator::get)); } @ProcessElement @@ -525,14 +465,13 @@ public ProcessContinuation processElement( throws Exception { final LoadingCache avgRecordSizeCache = this.avgRecordSizeCache; - final LoadingCache - latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache latestOffsetEstimatorCache = + this.latestOffsetEstimatorCache; final LoadingCache> pollConsumerCache = this.pollConsumerCache; final MovingAvg avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); - final KafkaLatestOffsetEstimator latestOffsetEstimator = - latestOffsetEstimatorCache.get(kafkaSourceDescriptor); + final AtomicLong latestOffsetEstimator = latestOffsetEstimatorCache.get(kafkaSourceDescriptor); final Consumer consumer = pollConsumerCache.get(kafkaSourceDescriptor); final Deserializer keyDeserializerInstance = Preconditions.checkStateNotNull(this.keyDeserializerInstance); @@ -580,6 +519,14 @@ public ProcessContinuation processElement( // Fetch the next records. final ConsumerRecords rawRecords = consumer.poll(remainingTimeout); final Duration elapsed = pollTimer.elapsed(); + try { + final long position = consumer.position(topicPartition); + consumer + .currentLag(topicPartition) + .ifPresent(lag -> latestOffsetEstimator.lazySet(position + lag)); + } catch (KafkaException e) { + } + try { remainingTimeout = remainingTimeout.minus(elapsed); } catch (ArithmeticException e) { @@ -687,7 +634,7 @@ public ProcessContinuation processElement( final long estimatedBacklogBytes = (long) - (BigDecimal.valueOf(latestOffsetEstimator.estimate()) + (BigDecimal.valueOf(latestOffsetEstimator.get()) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() * avgRecordSize.get()); @@ -752,8 +699,8 @@ public void setup() throws Exception { public void teardown() throws Exception { final LoadingCache avgRecordSizeCache = this.avgRecordSizeCache; - final LoadingCache - latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache latestOffsetEstimatorCache = + this.latestOffsetEstimatorCache; final LoadingCache> pollConsumerCache = this.pollConsumerCache; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java index c16e25510ab8..b64f5edabe76 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.io.kafka; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -236,7 +236,7 @@ public synchronized void commitSync(Map offse } @Override - public synchronized void close(long timeout, TimeUnit unit) { + public synchronized void close(Duration timeout) { // Ignore closing since we're using a single consumer. } } diff --git a/settings.gradle.kts b/settings.gradle.kts index bea48565bfc4..6545fa7623f6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -344,20 +344,8 @@ project(":beam-test-gha").projectDir = file(".github") include("beam-validate-runner") project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner") include("com.google.api.gax.batching") -include("sdks:java:io:kafka:kafka-390") -findProject(":sdks:java:io:kafka:kafka-390")?.name = "kafka-390" -include("sdks:java:io:kafka:kafka-312") -findProject(":sdks:java:io:kafka:kafka-312")?.name = "kafka-312" -include("sdks:java:io:kafka:kafka-282") -findProject(":sdks:java:io:kafka:kafka-282")?.name = "kafka-282" -include("sdks:java:io:kafka:kafka-251") -findProject(":sdks:java:io:kafka:kafka-251")?.name = "kafka-251" -include("sdks:java:io:kafka:kafka-241") -findProject(":sdks:java:io:kafka:kafka-241")?.name = "kafka-241" -include("sdks:java:io:kafka:kafka-231") -findProject(":sdks:java:io:kafka:kafka-231")?.name = "kafka-231" -include("sdks:java:io:kafka:kafka-201") -findProject(":sdks:java:io:kafka:kafka-201")?.name = "kafka-201" +include("sdks:java:io:kafka:kafka-391") +findProject(":sdks:java:io:kafka:kafka-391")?.name = "kafka-391" include("sdks:java:managed") findProject(":sdks:java:managed")?.name = "managed" include("sdks:java:io:iceberg") From 0859326583a1e3fb120c7c954cc113479e2a8f9f Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Sat, 20 Dec 2025 14:07:59 +0000 Subject: [PATCH 6/7] Replace KafkaServerStartable with KafkaServer --- .../apache/beam/gradle/BeamModulePlugin.groovy | 1 + runners/spark/spark_runner.gradle | 3 ++- .../streaming/utils/EmbeddedKafkaCluster.java | 17 +++++++++++------ sdks/java/testing/kafka-service/build.gradle | 1 + .../beam/sdk/testing/kafka/LocalKafka.java | 11 ++++++++--- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 1bb030ce5cfe..465367b2de2d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -838,6 +838,7 @@ class BeamModulePlugin implements Plugin { jupiter_params : "org.junit.jupiter:junit-jupiter-params:$jupiter_version", kafka : "org.apache.kafka:kafka_2.12:$kafka_version", kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version", + kafka_server : "org.apache.kafka:kafka-server:$kafka_version", log4j : "log4j:log4j:1.2.17", log4j_over_slf4j : "org.slf4j:log4j-over-slf4j:$slf4j_version", log4j2_api : "org.apache.logging.log4j:log4j-api:$log4j2_version", diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index ecdfc8f0f697..cfe1645c70c2 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -200,8 +200,9 @@ dependencies { testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(":sdks:java:harness") testImplementation library.java.avro - testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1" + testImplementation library.java.kafka testImplementation library.java.kafka_clients + testImplementation library.java.kafka_server testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation "org.assertj:assertj-core:3.11.1" diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java index df5646fed590..3acbf664e2cf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java @@ -29,7 +29,7 @@ import java.util.Properties; import java.util.Random; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; @@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster { private final String brokerList; - private final List brokers; + private final List brokers; private final List logDirs; private EmbeddedKafkaCluster(String zkConnection) { @@ -114,15 +114,20 @@ public void startup() { properties.setProperty("offsets.topic.replication.factor", "1"); properties.setProperty("log.flush.interval.messages", String.valueOf(1)); - KafkaServerStartable broker = startBroker(properties); + KafkaServer broker = startBroker(properties); brokers.add(broker); logDirs.add(logDir); } } - private static KafkaServerStartable startBroker(Properties props) { - KafkaServerStartable server = new KafkaServerStartable(new KafkaConfig(props)); + private static KafkaServer startBroker(Properties props) { + KafkaServer server = + new KafkaServer( + new KafkaConfig(props), + KafkaServer.$lessinit$greater$default$2(), + KafkaServer.$lessinit$greater$default$3(), + KafkaServer.$lessinit$greater$default$4()); server.startup(); return server; } @@ -148,7 +153,7 @@ public String getZkConnection() { @SuppressWarnings("Slf4jDoNotLogMessageOfExceptionExplicitly") public void shutdown() { - for (KafkaServerStartable broker : brokers) { + for (KafkaServer broker : brokers) { try { broker.shutdown(); } catch (Exception e) { diff --git a/sdks/java/testing/kafka-service/build.gradle b/sdks/java/testing/kafka-service/build.gradle index abd186f98b1d..57e43db1cca9 100644 --- a/sdks/java/testing/kafka-service/build.gradle +++ b/sdks/java/testing/kafka-service/build.gradle @@ -28,6 +28,7 @@ ext.summary = """Self-contained Kafka service for testing IO transforms.""" dependencies { testImplementation library.java.kafka + testImplementation library.java.kafka_server testImplementation "org.apache.zookeeper:zookeeper:3.5.6" testRuntimeOnly library.java.slf4j_log4j12 } diff --git a/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java b/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java index 71ec61a3e41e..dc88f3fe0a5a 100644 --- a/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java +++ b/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java @@ -20,10 +20,10 @@ import java.nio.file.Files; import java.util.Properties; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; public class LocalKafka { - private final KafkaServerStartable server; + private final KafkaServer server; LocalKafka(int kafkaPort, int zookeeperPort) throws Exception { Properties kafkaProperties = new Properties(); @@ -31,7 +31,12 @@ public class LocalKafka { kafkaProperties.setProperty("zookeeper.connect", String.format("localhost:%s", zookeeperPort)); kafkaProperties.setProperty("offsets.topic.replication.factor", "1"); kafkaProperties.setProperty("log.dir", Files.createTempDirectory("kafka-log-").toString()); - server = new KafkaServerStartable(KafkaConfig.fromProps(kafkaProperties)); + server = + new KafkaServer( + KafkaConfig.fromProps(kafkaProperties), + KafkaServer.$lessinit$greater$default$2(), + KafkaServer.$lessinit$greater$default$3(), + KafkaServer.$lessinit$greater$default$4()); } public void start() { From 3d6e871d77994640b903245d42bcf589e96e22a3 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Sun, 21 Dec 2025 22:33:59 +0000 Subject: [PATCH 7/7] Fix type ambiguity of method argument in test --- .../org/apache/beam/it/kafka/KafkaResourceManagerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java index 8c870815efc4..f25bca06c226 100644 --- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java +++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java @@ -166,7 +166,8 @@ public void testCleanupShouldDropNonStaticTopic() throws IOException { KafkaResourceManager tm = new KafkaResourceManager(kafkaClient, container, builder); tm.cleanupAll(); - verify(kafkaClient).deleteTopics(argThat(list -> list.size() == numTopics)); + verify(kafkaClient) + .deleteTopics(argThat((Collection list) -> list.size() == numTopics)); } @Test