diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 18dbcad4fcd1..465367b2de2d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -629,7 +629,7 @@ class BeamModulePlugin implements Plugin { def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" def everit_json_version = "1.14.2" - def kafka_version = "2.4.1" + def kafka_version = "3.9.1" def log4j2_version = "2.20.0" def nemo_version = "0.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom @@ -836,8 +836,9 @@ class BeamModulePlugin implements Plugin { jupiter_api : "org.junit.jupiter:junit-jupiter-api:$jupiter_version", jupiter_engine : "org.junit.jupiter:junit-jupiter-engine:$jupiter_version", jupiter_params : "org.junit.jupiter:junit-jupiter-params:$jupiter_version", - kafka : "org.apache.kafka:kafka_2.11:$kafka_version", + kafka : "org.apache.kafka:kafka_2.12:$kafka_version", kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version", + kafka_server : "org.apache.kafka:kafka-server:$kafka_version", log4j : "log4j:log4j:1.2.17", log4j_over_slf4j : "org.slf4j:log4j-over-slf4j:$slf4j_version", log4j2_api : "org.apache.logging.log4j:log4j-api:$log4j2_version", diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java index 8c870815efc4..f25bca06c226 100644 --- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java +++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaResourceManagerTest.java @@ -166,7 +166,8 @@ public void testCleanupShouldDropNonStaticTopic() throws IOException { KafkaResourceManager tm = new KafkaResourceManager(kafkaClient, container, builder); tm.cleanupAll(); - verify(kafkaClient).deleteTopics(argThat(list -> list.size() == numTopics)); + verify(kafkaClient) + .deleteTopics(argThat((Collection list) -> list.size() == numTopics)); } @Test diff --git a/runners/spark/spark_runner.gradle b/runners/spark/spark_runner.gradle index ecdfc8f0f697..cfe1645c70c2 100644 --- a/runners/spark/spark_runner.gradle +++ b/runners/spark/spark_runner.gradle @@ -200,8 +200,9 @@ dependencies { testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration") testImplementation project(":sdks:java:harness") testImplementation library.java.avro - testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1" + testImplementation library.java.kafka testImplementation library.java.kafka_clients + testImplementation library.java.kafka_server testImplementation library.java.junit testImplementation library.java.mockito_core testImplementation "org.assertj:assertj-core:3.11.1" diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java index df5646fed590..3acbf664e2cf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java @@ -29,7 +29,7 @@ import java.util.Properties; import java.util.Random; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; @@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster { private final String brokerList; - private final List brokers; + private final List brokers; private final List logDirs; private EmbeddedKafkaCluster(String zkConnection) { @@ -114,15 +114,20 @@ public void startup() { properties.setProperty("offsets.topic.replication.factor", "1"); properties.setProperty("log.flush.interval.messages", String.valueOf(1)); - KafkaServerStartable broker = startBroker(properties); + KafkaServer broker = startBroker(properties); brokers.add(broker); logDirs.add(logDir); } } - private static KafkaServerStartable startBroker(Properties props) { - KafkaServerStartable server = new KafkaServerStartable(new KafkaConfig(props)); + private static KafkaServer startBroker(Properties props) { + KafkaServer server = + new KafkaServer( + new KafkaConfig(props), + KafkaServer.$lessinit$greater$default$2(), + KafkaServer.$lessinit$greater$default$3(), + KafkaServer.$lessinit$greater$default$4()); server.startup(); return server; } @@ -148,7 +153,7 @@ public String getZkConnection() { @SuppressWarnings("Slf4jDoNotLogMessageOfExceptionExplicitly") public void shutdown() { - for (KafkaServerStartable broker : brokers) { + for (KafkaServer broker : brokers) { try { broker.shutdown(); } catch (Exception e) { diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index ba25078b64e3..0e7bf43a8c9c 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -36,13 +36,7 @@ ext { } def kafkaVersions = [ - '201': "2.0.1", - '231': "2.3.1", - '241': "2.4.1", - '251': "2.5.1", - '282': "2.8.2", - '312': "3.1.2", - '390': "3.9.0", + '391': "3.9.1", ] kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")} diff --git a/sdks/java/io/kafka/kafka-231/build.gradle b/sdks/java/io/kafka/kafka-231/build.gradle deleted file mode 100644 index 712158dcd3ae..000000000000 --- a/sdks/java/io/kafka/kafka-231/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.3.1" - undelimited="231" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-241/build.gradle b/sdks/java/io/kafka/kafka-241/build.gradle deleted file mode 100644 index c0ac7df674b5..000000000000 --- a/sdks/java/io/kafka/kafka-241/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.4.1" - undelimited="241" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-251/build.gradle b/sdks/java/io/kafka/kafka-251/build.gradle deleted file mode 100644 index 4de9f97a738a..000000000000 --- a/sdks/java/io/kafka/kafka-251/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.5.1" - undelimited="251" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-282/build.gradle b/sdks/java/io/kafka/kafka-282/build.gradle deleted file mode 100644 index b754d93077e5..000000000000 --- a/sdks/java/io/kafka/kafka-282/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="2.8.2" - undelimited="282" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-312/build.gradle b/sdks/java/io/kafka/kafka-312/build.gradle deleted file mode 100644 index af2ad3717b65..000000000000 --- a/sdks/java/io/kafka/kafka-312/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="3.1.2" - undelimited="312" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-390/build.gradle b/sdks/java/io/kafka/kafka-390/build.gradle deleted file mode 100644 index 8c8821386265..000000000000 --- a/sdks/java/io/kafka/kafka-390/build.gradle +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -project.ext { - delimited="3.9.0" - undelimited="390" - sdfCompatible=true -} - -apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/kafka-201/build.gradle b/sdks/java/io/kafka/kafka-391/build.gradle similarity index 95% rename from sdks/java/io/kafka/kafka-201/build.gradle rename to sdks/java/io/kafka/kafka-391/build.gradle index a26ca4ac19cf..d2f96debc782 100644 --- a/sdks/java/io/kafka/kafka-201/build.gradle +++ b/sdks/java/io/kafka/kafka-391/build.gradle @@ -16,8 +16,8 @@ * limitations under the License. */ project.ext { - delimited="2.0.1" - undelimited="201" + delimited="3.9.1" + undelimited="391" sdfCompatible=true } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a05abba06e75..341c15ba8f59 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -19,7 +19,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.io.Closeable; import java.math.BigDecimal; import java.math.MathContext; import java.time.Duration; @@ -27,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Supplier; +import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; @@ -49,7 +48,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExpiringMemoizingSerializableSupplier; import org.apache.beam.sdk.util.MemoizingPerInstantiationSerializableSupplier; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.SerializableSupplier; @@ -71,6 +69,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; @@ -235,30 +234,12 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor) CacheBuilder.newBuilder() .concurrencyLevel(Runtime.getRuntime().availableProcessors()) .weakValues() - .removalListener( - (RemovalNotification - notification) -> { - final @Nullable KafkaLatestOffsetEstimator value; - if (notification.getCause() == RemovalCause.COLLECTED - && (value = notification.getValue()) != null) { - value.close(); - } - }) .build( - new CacheLoader() { + new CacheLoader() { @Override - public KafkaLatestOffsetEstimator load( - final KafkaSourceDescriptor sourceDescriptor) { - LOG.info( - "Creating Kafka consumer for offset estimation for {}", - sourceDescriptor); - final Map config = - KafkaIOUtils.overrideBootstrapServersConfig( - consumerConfig, sourceDescriptor); - final Consumer consumer = - consumerFactoryFn.apply(config); - return new KafkaLatestOffsetEstimator( - consumer, sourceDescriptor.getTopicPartition()); + public AtomicLong load(final KafkaSourceDescriptor sourceDescriptor) { + LOG.info("Creating end offset estimator for {}", sourceDescriptor); + return new AtomicLong(Long.MIN_VALUE); } })); this.pollConsumerCacheSupplier = @@ -319,8 +300,7 @@ public Consumer load( private final SerializableSupplier> avgRecordSizeCacheSupplier; - private final SerializableSupplier< - LoadingCache> + private final SerializableSupplier> latestOffsetEstimatorCacheSupplier; private final SerializableSupplier>> @@ -329,8 +309,7 @@ public Consumer load( private transient @MonotonicNonNull LoadingCache avgRecordSizeCache; - private transient @MonotonicNonNull LoadingCache< - KafkaSourceDescriptor, KafkaLatestOffsetEstimator> + private transient @MonotonicNonNull LoadingCache latestOffsetEstimatorCache; private transient @MonotonicNonNull LoadingCache> @@ -349,46 +328,6 @@ public Consumer load( @VisibleForTesting static final String RAW_SIZE_METRIC_PREFIX = KafkaUnboundedReader.RAW_SIZE_METRIC_PREFIX; - /** - * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to - * fetch backlog. - */ - private static class KafkaLatestOffsetEstimator - implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable { - private final Consumer offsetConsumer; - private final Supplier offsetSupplier; - - KafkaLatestOffsetEstimator( - final Consumer offsetConsumer, final TopicPartition topicPartition) { - this.offsetConsumer = offsetConsumer; - this.offsetSupplier = - new ExpiringMemoizingSerializableSupplier<>( - () -> { - try { - return offsetConsumer - .endOffsets(Collections.singleton(topicPartition)) - .getOrDefault(topicPartition, Long.MIN_VALUE); - } catch (Throwable t) { - LOG.error("Failed to get end offset for {}", topicPartition, t); - return Long.MIN_VALUE; - } - }, - Duration.ofSeconds(1), - Long.MIN_VALUE, - Duration.ZERO); - } - - @Override - public long estimate() { - return offsetSupplier.get(); - } - - @Override - public void close() { - offsetConsumer.close(); - } - } - @GetInitialRestriction @RequiresNonNull({"pollConsumerCache"}) public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSourceDescriptor) { @@ -500,8 +439,8 @@ public double getSize( @RequiresNonNull({"latestOffsetEstimatorCache"}) public UnsplittableRestrictionTracker restrictionTracker( @Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) { - final LoadingCache - latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache latestOffsetEstimatorCache = + this.latestOffsetEstimatorCache; if (restriction.getTo() < Long.MAX_VALUE) { return new UnsplittableRestrictionTracker<>(new OffsetRangeTracker(restriction)); @@ -510,9 +449,10 @@ public UnsplittableRestrictionTracker restrictionTracker( // OffsetEstimators are cached for each topic-partition because they hold a stateful connection, // so we want to minimize the amount of connections that we start and track with Kafka. Another // point is that it has a memoized backlog, and this should make that more reusable estimations. + final AtomicLong latestOffsetEstimator = + latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor); return new UnsplittableRestrictionTracker<>( - new GrowableOffsetRangeTracker( - restriction.getFrom(), latestOffsetEstimatorCache.getUnchecked(kafkaSourceDescriptor))); + new GrowableOffsetRangeTracker(restriction.getFrom(), latestOffsetEstimator::get)); } @ProcessElement @@ -525,14 +465,13 @@ public ProcessContinuation processElement( throws Exception { final LoadingCache avgRecordSizeCache = this.avgRecordSizeCache; - final LoadingCache - latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache latestOffsetEstimatorCache = + this.latestOffsetEstimatorCache; final LoadingCache> pollConsumerCache = this.pollConsumerCache; final MovingAvg avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor); - final KafkaLatestOffsetEstimator latestOffsetEstimator = - latestOffsetEstimatorCache.get(kafkaSourceDescriptor); + final AtomicLong latestOffsetEstimator = latestOffsetEstimatorCache.get(kafkaSourceDescriptor); final Consumer consumer = pollConsumerCache.get(kafkaSourceDescriptor); final Deserializer keyDeserializerInstance = Preconditions.checkStateNotNull(this.keyDeserializerInstance); @@ -580,6 +519,14 @@ public ProcessContinuation processElement( // Fetch the next records. final ConsumerRecords rawRecords = consumer.poll(remainingTimeout); final Duration elapsed = pollTimer.elapsed(); + try { + final long position = consumer.position(topicPartition); + consumer + .currentLag(topicPartition) + .ifPresent(lag -> latestOffsetEstimator.lazySet(position + lag)); + } catch (KafkaException e) { + } + try { remainingTimeout = remainingTimeout.minus(elapsed); } catch (ArithmeticException e) { @@ -687,7 +634,7 @@ public ProcessContinuation processElement( final long estimatedBacklogBytes = (long) - (BigDecimal.valueOf(latestOffsetEstimator.estimate()) + (BigDecimal.valueOf(latestOffsetEstimator.get()) .subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128) .doubleValue() * avgRecordSize.get()); @@ -752,8 +699,8 @@ public void setup() throws Exception { public void teardown() throws Exception { final LoadingCache avgRecordSizeCache = this.avgRecordSizeCache; - final LoadingCache - latestOffsetEstimatorCache = this.latestOffsetEstimatorCache; + final LoadingCache latestOffsetEstimatorCache = + this.latestOffsetEstimatorCache; final LoadingCache> pollConsumerCache = this.pollConsumerCache; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java index c16e25510ab8..b64f5edabe76 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffsetTest.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.io.kafka; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -236,7 +236,7 @@ public synchronized void commitSync(Map offse } @Override - public synchronized void close(long timeout, TimeUnit unit) { + public synchronized void close(Duration timeout) { // Ignore closing since we're using a single consumer. } } diff --git a/sdks/java/testing/kafka-service/build.gradle b/sdks/java/testing/kafka-service/build.gradle index abd186f98b1d..57e43db1cca9 100644 --- a/sdks/java/testing/kafka-service/build.gradle +++ b/sdks/java/testing/kafka-service/build.gradle @@ -28,6 +28,7 @@ ext.summary = """Self-contained Kafka service for testing IO transforms.""" dependencies { testImplementation library.java.kafka + testImplementation library.java.kafka_server testImplementation "org.apache.zookeeper:zookeeper:3.5.6" testRuntimeOnly library.java.slf4j_log4j12 } diff --git a/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java b/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java index 71ec61a3e41e..dc88f3fe0a5a 100644 --- a/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java +++ b/sdks/java/testing/kafka-service/src/test/java/org/apache/beam/sdk/testing/kafka/LocalKafka.java @@ -20,10 +20,10 @@ import java.nio.file.Files; import java.util.Properties; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; public class LocalKafka { - private final KafkaServerStartable server; + private final KafkaServer server; LocalKafka(int kafkaPort, int zookeeperPort) throws Exception { Properties kafkaProperties = new Properties(); @@ -31,7 +31,12 @@ public class LocalKafka { kafkaProperties.setProperty("zookeeper.connect", String.format("localhost:%s", zookeeperPort)); kafkaProperties.setProperty("offsets.topic.replication.factor", "1"); kafkaProperties.setProperty("log.dir", Files.createTempDirectory("kafka-log-").toString()); - server = new KafkaServerStartable(KafkaConfig.fromProps(kafkaProperties)); + server = + new KafkaServer( + KafkaConfig.fromProps(kafkaProperties), + KafkaServer.$lessinit$greater$default$2(), + KafkaServer.$lessinit$greater$default$3(), + KafkaServer.$lessinit$greater$default$4()); } public void start() { diff --git a/settings.gradle.kts b/settings.gradle.kts index bea48565bfc4..6545fa7623f6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -344,20 +344,8 @@ project(":beam-test-gha").projectDir = file(".github") include("beam-validate-runner") project(":beam-validate-runner").projectDir = file(".test-infra/validate-runner") include("com.google.api.gax.batching") -include("sdks:java:io:kafka:kafka-390") -findProject(":sdks:java:io:kafka:kafka-390")?.name = "kafka-390" -include("sdks:java:io:kafka:kafka-312") -findProject(":sdks:java:io:kafka:kafka-312")?.name = "kafka-312" -include("sdks:java:io:kafka:kafka-282") -findProject(":sdks:java:io:kafka:kafka-282")?.name = "kafka-282" -include("sdks:java:io:kafka:kafka-251") -findProject(":sdks:java:io:kafka:kafka-251")?.name = "kafka-251" -include("sdks:java:io:kafka:kafka-241") -findProject(":sdks:java:io:kafka:kafka-241")?.name = "kafka-241" -include("sdks:java:io:kafka:kafka-231") -findProject(":sdks:java:io:kafka:kafka-231")?.name = "kafka-231" -include("sdks:java:io:kafka:kafka-201") -findProject(":sdks:java:io:kafka:kafka-201")?.name = "kafka-201" +include("sdks:java:io:kafka:kafka-391") +findProject(":sdks:java:io:kafka:kafka-391")?.name = "kafka-391" include("sdks:java:managed") findProject(":sdks:java:managed")?.name = "managed" include("sdks:java:io:iceberg")