diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 415132fa7d2c..0961a385b214 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -129,6 +129,8 @@ dependencies { testImplementation library.java.google_cloud_dataflow_java_proto_library_all testImplementation library.java.jackson_dataformat_yaml testImplementation library.java.mockito_inline + testImplementation project(":sdks:java:io:kafka") + testImplementation library.java.kafka_clients validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: project.path, configuration: "testRuntimeMigration") validatesRunner library.java.hamcrest diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7e23182042c9..7d0a151b48b9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -659,6 +659,10 @@ private List getOverrides(boolean streaming) { try { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); + overridesBuilder.add( + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory())); } catch (NoClassDefFoundError e) { // Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java // and only needed when KafkaIO is used in the pipeline. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java new file mode 100644 index 000000000000..89f0eef9b8cc --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import java.util.Map; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.kafka.KafkaRecord; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.util.construction.ReplacementOutputs; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +public final class KafkaReadWithRedistributeOverride { + + private KafkaReadWithRedistributeOverride() {} + + public static PTransformMatcher matcher() { + return new PTransformMatcher() { + @SuppressWarnings({ + "PatternMatchingInstanceof" // For compiling on older Java versions. + }) + @Override + public boolean matches(AppliedPTransform application) { + if (application.getTransform() instanceof KafkaIO.Read) { + return ((KafkaIO.Read) application.getTransform()).isRedistributed(); + } + return false; + } + }; + } + + /** + * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code + * withOffsetDeduplication} when {@code withRedistribute} is enabled. + */ + static class Factory + implements PTransformOverrideFactory< + PBegin, PCollection>, KafkaIO.Read> { + + @Override + public PTransformReplacement>> getReplacementTransform( + AppliedPTransform>, KafkaIO.Read> transform) { + KafkaIO.Read read = transform.getTransform(); + if (read.getOffsetDeduplication() == null) { + return PTransformReplacement.of( + transform.getPipeline().begin(), read.withOffsetDeduplication(true)); + } + return PTransformReplacement.of(transform.getPipeline().begin(), read); + } + + @Override + public Map, ReplacementOutput> mapOutputs( + Map, PCollection> outputs, PCollection> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } +} diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java new file mode 100644 index 000000000000..05e5dd6a55d1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.Serializable; +import java.util.Collections; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class KafkaReadWithRedistributeOverrideTest implements Serializable { + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testOverrideAppliedWhenRedistributeEnabled() { + p.apply( + "MatchingRead", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_match") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withRedistribute()); + p.apply( + "NoRedistribute", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_no_redistribute") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class)); + p.apply( + "ExplicitlyDisable", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_disabled") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withOffsetDeduplication(false)); + p.apply( + "ExplicitlyEnable", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_enabled") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withRedistribute() + .withOffsetDeduplication(true)); + + p.replaceAll( + Collections.singletonList( + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory<>()))); + + Pipeline.PipelineVisitor visitor = + new Pipeline.PipelineVisitor.Defaults() { + + private boolean matchingVisited = false; + private boolean noRedistributeVisited = false; + private boolean explicitlyDisabledVisited = false; + private boolean explicitlyEnabledVisited = false; + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + if (node.getTransform() instanceof KafkaIO.Read) { + KafkaIO.Read read = (KafkaIO.Read) node.getTransform(); + if (read.getTopics().contains("test_match")) { + assertTrue(read.isRedistributed()); + assertTrue(read.getOffsetDeduplication()); + assertFalse(matchingVisited); + matchingVisited = true; + } else if (read.getTopics().contains("test_no_redistribute")) { + assertFalse(read.isRedistributed()); + assertThat(read.getOffsetDeduplication(), nullValue()); + assertFalse(noRedistributeVisited); + noRedistributeVisited = true; + } else if (read.getTopics().contains("test_disabled")) { + assertFalse(read.isRedistributed()); + assertFalse(read.getOffsetDeduplication()); + assertFalse(explicitlyDisabledVisited); + explicitlyDisabledVisited = true; + } else if (read.getTopics().contains("test_enabled")) { + assertTrue(read.isRedistributed()); + assertTrue(read.getOffsetDeduplication()); + assertFalse(explicitlyEnabledVisited); + explicitlyEnabledVisited = true; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(Node node) { + if (node.isRootNode()) { + assertTrue("Matching transform was not visited", matchingVisited); + assertTrue("No redistribute transform was not visited", noRedistributeVisited); + assertTrue( + "Explicitly disabled transform was not visited", explicitlyDisabledVisited); + assertTrue("Explicitly enabled transform was not visited", explicitlyEnabledVisited); + } + } + }; + p.traverseTopologically(visitor); + } +}