From 8cdbaf756822b1f97df7fb5aa80b79e1c364d5c8 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Tue, 18 Nov 2025 11:01:56 -0800 Subject: [PATCH 01/11] Add kafka read override to Dataflow java runner. --- .../beam/runners/dataflow/DataflowRunner.java | 4 + .../KafkaReadWithRedistributeOverride.java | 74 +++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7e23182042c9..c09f7d5820db 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -659,6 +659,10 @@ private List getOverrides(boolean streaming) { try { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); + overridesBuilder.add( + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory())); } catch (NoClassDefFoundError e) { // Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java // and only needed when KafkaIO is used in the pipeline. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java new file mode 100644 index 000000000000..2fdcacb306f1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import java.util.Map; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.io.kafka.KafkaRecord; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.runners.PTransformMatcher; +import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.sdk.util.construction.ReplacementOutputs; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness", // TODO(https://github.com/apache/beam/issues/20497) + "PatternMatchingInstanceof" +}) +public final class KafkaReadWithRedistributeOverride { + + private KafkaReadWithRedistributeOverride() {} + + public static PTransformMatcher matcher() { + return new PTransformMatcher() { + @Override + public boolean matches(AppliedPTransform application) { + if (application.getTransform() instanceof KafkaIO.Read) { + return ((KafkaIO.Read) application.getTransform()).isRedistributed(); + } + return false; + } + }; + } + + /** + * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code + * withOffsetDeduplication} when {@code withRedistribute} is enabled. + */ + static class Factory + implements PTransformOverrideFactory< + PBegin, PCollection>, KafkaIO.Read> { + + @Override + public PTransformReplacement>> getReplacementTransform( + AppliedPTransform>, KafkaIO.Read> transform) { + KafkaIO.Read read = transform.getTransform(); + return PTransformReplacement.of( + transform.getPipeline().begin(), read.withOffsetDeduplication(true)); + } + + @Override + public Map, ReplacementOutput> mapOutputs( + Map, PCollection> outputs, PCollection> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } +} From cf76c1ced24bee7203cf47883ec9127772ac676a Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Tue, 18 Nov 2025 11:41:09 -0800 Subject: [PATCH 02/11] Fix spot bugs (spacing) --- .../apache/beam/runners/dataflow/DataflowRunner.java | 6 +++--- .../dataflow/KafkaReadWithRedistributeOverride.java | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index c09f7d5820db..7d0a151b48b9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -660,9 +660,9 @@ private List getOverrides(boolean streaming) { try { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); overridesBuilder.add( - PTransformOverride.of( - KafkaReadWithRedistributeOverride.matcher(), - new KafkaReadWithRedistributeOverride.Factory())); + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory())); } catch (NoClassDefFoundError e) { // Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java // and only needed when KafkaIO is used in the pipeline. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java index 2fdcacb306f1..4bc3de16a36d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java @@ -54,20 +54,20 @@ public boolean matches(AppliedPTransform application) { * withOffsetDeduplication} when {@code withRedistribute} is enabled. */ static class Factory - implements PTransformOverrideFactory< - PBegin, PCollection>, KafkaIO.Read> { + implements PTransformOverrideFactory< + PBegin, PCollection>, KafkaIO.Read> { @Override public PTransformReplacement>> getReplacementTransform( - AppliedPTransform>, KafkaIO.Read> transform) { + AppliedPTransform>, KafkaIO.Read> transform) { KafkaIO.Read read = transform.getTransform(); return PTransformReplacement.of( - transform.getPipeline().begin(), read.withOffsetDeduplication(true)); + transform.getPipeline().begin(), read.withOffsetDeduplication(true)); } @Override public Map, ReplacementOutput> mapOutputs( - Map, PCollection> outputs, PCollection> newOutput) { + Map, PCollection> outputs, PCollection> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } } From 346158f4ba270057933994d8908194622b326fbd Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Tue, 18 Nov 2025 12:13:01 -0800 Subject: [PATCH 03/11] Add unit test of redistribute override --- ...KafkaReadWithRedistributeOverrideTest.java | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java new file mode 100644 index 000000000000..0d2e7901d374 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import java.io.Serializable; +import java.util.Collections; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.runners.PTransformOverride; +import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class KafkaReadWithRedistributeOverrideTest implements Serializable { + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testOverrideAppliedWhenRedistributeEnabled() { + p.apply( + "MatchingRead", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_match") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withRedistribute()); + p.apply( + "NonMatchingRead", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_nomatch") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class)); + + p.replaceAll( + Collections.singletonList( + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory<>()))); + + Pipeline.PipelineVisitor visitor = + new Pipeline.PipelineVisitor.Defaults() { + + private boolean matchingVisited = false; + private boolean nonMatchingVisited = false; + + @Override + public CompositeBehavior enterCompositeTransform(Node node) { + if (node.getTransform() instanceof KafkaIO.Read) { + KafkaIO.Read read = (KafkaIO.Read) node.getTransform(); + if (read.getTopics().contains("test_match")) { + assertThat(read.isRedistributed(), is(true)); + assertThat(read.getOffsetDeduplication(), is(true)); + matchingVisited = true; + } else if (read.getTopics().contains("test_nomatch")) { + assertThat(read.isRedistributed(), is(false)); + assertThat(read.getOffsetDeduplication(), nullValue()); + nonMatchingVisited = true; + } + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(Node node) { + if (node.isRootNode()) { + assertThat("Matching transform was not visited", matchingVisited, is(true)); + assertThat("Non-matching transform was not visited", nonMatchingVisited, is(true)); + } + } + }; + p.traverseTopologically(visitor); + } +} From 6335075ca20ef395d9f5b1544684371c8998044e Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Tue, 18 Nov 2025 12:21:59 -0800 Subject: [PATCH 04/11] Update test dependencies via gradle --- runners/google-cloud-dataflow-java/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 415132fa7d2c..0961a385b214 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -129,6 +129,8 @@ dependencies { testImplementation library.java.google_cloud_dataflow_java_proto_library_all testImplementation library.java.jackson_dataformat_yaml testImplementation library.java.mockito_inline + testImplementation project(":sdks:java:io:kafka") + testImplementation library.java.kafka_clients validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: project.path, configuration: "testRuntimeMigration") validatesRunner library.java.hamcrest From 1cdd6bf36e3a454d33ba64e2dd110109a2144808 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Wed, 19 Nov 2025 08:09:36 -0800 Subject: [PATCH 05/11] Add logic and test case for explicitly disabled. --- .../KafkaReadWithRedistributeOverride.java | 7 +++-- ...KafkaReadWithRedistributeOverrideTest.java | 30 +++++++++++++++---- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java index 4bc3de16a36d..f8e84c0b1025 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java @@ -61,8 +61,11 @@ static class Factory public PTransformReplacement>> getReplacementTransform( AppliedPTransform>, KafkaIO.Read> transform) { KafkaIO.Read read = transform.getTransform(); - return PTransformReplacement.of( - transform.getPipeline().begin(), read.withOffsetDeduplication(true)); + if (read.getOffsetDeduplication() == null) { + return PTransformReplacement.of( + transform.getPipeline().begin(), read.withOffsetDeduplication(true)); + } + return PTransformReplacement.of(transform.getPipeline().begin(), read); } @Override diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java index 0d2e7901d374..09ef470fd0e6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java @@ -49,12 +49,20 @@ public void testOverrideAppliedWhenRedistributeEnabled() { .withValueDeserializer(StringDeserializer.class) .withRedistribute()); p.apply( - "NonMatchingRead", + "NoRedistribute", KafkaIO.read() .withBootstrapServers("localhost:9092") - .withTopic("test_nomatch") + .withTopic("test_no_redistribute") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)); + p.apply( + "ExplicitlyDisable", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_disable") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withOffsetDeduplication(false)); p.replaceAll( Collections.singletonList( @@ -66,7 +74,8 @@ public void testOverrideAppliedWhenRedistributeEnabled() { new Pipeline.PipelineVisitor.Defaults() { private boolean matchingVisited = false; - private boolean nonMatchingVisited = false; + private boolean noRedistributeVisited = false; + private boolean explicitlyDisabledVisited = false; @Override public CompositeBehavior enterCompositeTransform(Node node) { @@ -76,10 +85,14 @@ public CompositeBehavior enterCompositeTransform(Node node) { assertThat(read.isRedistributed(), is(true)); assertThat(read.getOffsetDeduplication(), is(true)); matchingVisited = true; - } else if (read.getTopics().contains("test_nomatch")) { + } else if (read.getTopics().contains("test_no_redistribute")) { assertThat(read.isRedistributed(), is(false)); assertThat(read.getOffsetDeduplication(), nullValue()); - nonMatchingVisited = true; + noRedistributeVisited = true; + } else if (read.getTopics().contains("test_disable")) { + assertThat(read.isRedistributed(), is(false)); + assertThat(read.getOffsetDeduplication(), is(false)); + explicitlyDisabledVisited = true; } } return CompositeBehavior.ENTER_TRANSFORM; @@ -89,7 +102,12 @@ public CompositeBehavior enterCompositeTransform(Node node) { public void leaveCompositeTransform(Node node) { if (node.isRootNode()) { assertThat("Matching transform was not visited", matchingVisited, is(true)); - assertThat("Non-matching transform was not visited", nonMatchingVisited, is(true)); + assertThat( + "No redistribute transform was not visited", noRedistributeVisited, is(true)); + assertThat( + "Explicitly disable transform was not visited", + explicitlyDisabledVisited, + is(true)); } } }; From 1a52da32b6ee9814396ead5efbdf82366f201ea2 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Wed, 19 Nov 2025 08:12:51 -0800 Subject: [PATCH 06/11] Add explicitly enabled test case --- ...KafkaReadWithRedistributeOverrideTest.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java index 09ef470fd0e6..c70272b910db 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java @@ -59,10 +59,19 @@ public void testOverrideAppliedWhenRedistributeEnabled() { "ExplicitlyDisable", KafkaIO.read() .withBootstrapServers("localhost:9092") - .withTopic("test_disable") + .withTopic("test_disabled") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class) .withOffsetDeduplication(false)); + p.apply( + "ExplicitlyEnable", + KafkaIO.read() + .withBootstrapServers("localhost:9092") + .withTopic("test_enabled") + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withRedistribute() + .withOffsetDeduplication(true)); p.replaceAll( Collections.singletonList( @@ -76,6 +85,7 @@ public void testOverrideAppliedWhenRedistributeEnabled() { private boolean matchingVisited = false; private boolean noRedistributeVisited = false; private boolean explicitlyDisabledVisited = false; + private boolean explicitlyEnabledVisited = false; @Override public CompositeBehavior enterCompositeTransform(Node node) { @@ -89,10 +99,14 @@ public CompositeBehavior enterCompositeTransform(Node node) { assertThat(read.isRedistributed(), is(false)); assertThat(read.getOffsetDeduplication(), nullValue()); noRedistributeVisited = true; - } else if (read.getTopics().contains("test_disable")) { + } else if (read.getTopics().contains("test_disabled")) { assertThat(read.isRedistributed(), is(false)); assertThat(read.getOffsetDeduplication(), is(false)); explicitlyDisabledVisited = true; + } else if (read.getTopics().contains("test_enabled")) { + assertThat(read.isRedistributed(), is(true)); + assertThat(read.getOffsetDeduplication(), is(true)); + explicitlyEnabledVisited = true; } } return CompositeBehavior.ENTER_TRANSFORM; @@ -105,9 +119,13 @@ public void leaveCompositeTransform(Node node) { assertThat( "No redistribute transform was not visited", noRedistributeVisited, is(true)); assertThat( - "Explicitly disable transform was not visited", + "Explicitly disabled transform was not visited", explicitlyDisabledVisited, is(true)); + assertThat( + "Explicitly enabled transform was not visited", + explicitlyEnabledVisited, + is(true)); } } }; From 67e3ca8bbe8ff5a61b63bda9e8361f382ffe668a Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Thu, 20 Nov 2025 09:01:06 -0800 Subject: [PATCH 07/11] Use boolean asserts over assertThat, assert each read is visited only once, refine suppressed lint warnings to just instanceof on matches method. --- .../KafkaReadWithRedistributeOverride.java | 8 ++-- ...KafkaReadWithRedistributeOverrideTest.java | 37 +++++++++---------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java index f8e84c0b1025..89f0eef9b8cc 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java @@ -28,17 +28,15 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness", // TODO(https://github.com/apache/beam/issues/20497) - "PatternMatchingInstanceof" -}) public final class KafkaReadWithRedistributeOverride { private KafkaReadWithRedistributeOverride() {} public static PTransformMatcher matcher() { return new PTransformMatcher() { + @SuppressWarnings({ + "PatternMatchingInstanceof" // For compiling on older Java versions. + }) @Override public boolean matches(AppliedPTransform application) { if (application.getTransform() instanceof KafkaIO.Read) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java index c70272b910db..05e5dd6a55d1 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java @@ -18,8 +18,9 @@ package org.apache.beam.runners.dataflow; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.Collections; @@ -92,20 +93,24 @@ public CompositeBehavior enterCompositeTransform(Node node) { if (node.getTransform() instanceof KafkaIO.Read) { KafkaIO.Read read = (KafkaIO.Read) node.getTransform(); if (read.getTopics().contains("test_match")) { - assertThat(read.isRedistributed(), is(true)); - assertThat(read.getOffsetDeduplication(), is(true)); + assertTrue(read.isRedistributed()); + assertTrue(read.getOffsetDeduplication()); + assertFalse(matchingVisited); matchingVisited = true; } else if (read.getTopics().contains("test_no_redistribute")) { - assertThat(read.isRedistributed(), is(false)); + assertFalse(read.isRedistributed()); assertThat(read.getOffsetDeduplication(), nullValue()); + assertFalse(noRedistributeVisited); noRedistributeVisited = true; } else if (read.getTopics().contains("test_disabled")) { - assertThat(read.isRedistributed(), is(false)); - assertThat(read.getOffsetDeduplication(), is(false)); + assertFalse(read.isRedistributed()); + assertFalse(read.getOffsetDeduplication()); + assertFalse(explicitlyDisabledVisited); explicitlyDisabledVisited = true; } else if (read.getTopics().contains("test_enabled")) { - assertThat(read.isRedistributed(), is(true)); - assertThat(read.getOffsetDeduplication(), is(true)); + assertTrue(read.isRedistributed()); + assertTrue(read.getOffsetDeduplication()); + assertFalse(explicitlyEnabledVisited); explicitlyEnabledVisited = true; } } @@ -115,17 +120,11 @@ public CompositeBehavior enterCompositeTransform(Node node) { @Override public void leaveCompositeTransform(Node node) { if (node.isRootNode()) { - assertThat("Matching transform was not visited", matchingVisited, is(true)); - assertThat( - "No redistribute transform was not visited", noRedistributeVisited, is(true)); - assertThat( - "Explicitly disabled transform was not visited", - explicitlyDisabledVisited, - is(true)); - assertThat( - "Explicitly enabled transform was not visited", - explicitlyEnabledVisited, - is(true)); + assertTrue("Matching transform was not visited", matchingVisited); + assertTrue("No redistribute transform was not visited", noRedistributeVisited); + assertTrue( + "Explicitly disabled transform was not visited", explicitlyDisabledVisited); + assertTrue("Explicitly enabled transform was not visited", explicitlyEnabledVisited); } } }; From 8e6972baec3679c2b726b23488ecca6f8b9f62a2 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Mon, 24 Nov 2025 09:08:49 -0800 Subject: [PATCH 08/11] Move kafka read with redistribute override to Kafka IO package. --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 5 +---- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 7 +++++++ .../sdk/io/kafka}/KafkaReadWithRedistributeOverride.java | 2 +- .../io/kafka}/KafkaReadWithRedistributeOverrideTest.java | 2 +- 4 files changed, 10 insertions(+), 6 deletions(-) rename {runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow => sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka}/KafkaReadWithRedistributeOverride.java (98%) rename {runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow => sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka}/KafkaReadWithRedistributeOverrideTest.java (99%) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7d0a151b48b9..775e7b91de93 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -659,10 +659,7 @@ private List getOverrides(boolean streaming) { try { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); - overridesBuilder.add( - PTransformOverride.of( - KafkaReadWithRedistributeOverride.matcher(), - new KafkaReadWithRedistributeOverride.Factory())); + overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE); } catch (NoClassDefFoundError e) { // Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java // and only needed when KafkaIO is used in the pipeline. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 48e4ae2317ac..aaf9670858c7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1817,6 +1817,13 @@ private boolean runnerPrefersLegacyRead(PipelineOptions options) { return true; } + /** A {@link PTransformOverride} for runners to override redistributed Kafka Read transforms.*/ + @Internal + public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE = + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory()); + /** * A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka * read if runners doesn't have a good support on executing unbounded Splittable DoFn. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java similarity index 98% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java index 89f0eef9b8cc..e535889239b4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow; +package org.apache.beam.sdk.io.kafka; import java.util.Map; import org.apache.beam.sdk.io.kafka.KafkaIO; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java similarity index 99% rename from runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java rename to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java index 05e5dd6a55d1..989f760a8136 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow; +package org.apache.beam.sdk.io.kafka; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.nullValue; From f64b94f8aa78f8500599019bd0a8e142dccc3466 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Mon, 24 Nov 2025 09:30:08 -0800 Subject: [PATCH 09/11] Lint fixes --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 4 ++-- .../sdk/io/kafka/KafkaReadWithRedistributeOverride.java | 6 ++---- .../sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java | 1 - 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index aaf9670858c7..ad5535517646 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1817,12 +1817,12 @@ private boolean runnerPrefersLegacyRead(PipelineOptions options) { return true; } - /** A {@link PTransformOverride} for runners to override redistributed Kafka Read transforms.*/ + /** A {@link PTransformOverride} for runners to override redistributed Kafka Read transforms. */ @Internal public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE = PTransformOverride.of( KafkaReadWithRedistributeOverride.matcher(), - new KafkaReadWithRedistributeOverride.Factory()); + new KafkaReadWithRedistributeOverride.Factory<>()); /** * A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java index e535889239b4..f8ebaaed56b7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java @@ -18,8 +18,6 @@ package org.apache.beam.sdk.io.kafka; import java.util.Map; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -48,8 +46,8 @@ public boolean matches(AppliedPTransform application) { } /** - * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code - * withOffsetDeduplication} when {@code withRedistribute} is enabled. + * {@link PTransformOverrideFactory} for {@link org.apache.beam.sdk.io.kafka.KafkaIO.Read} that + * enables {@code withOffsetDeduplication} when {@code withRedistribute} is enabled. */ static class Factory implements PTransformOverrideFactory< diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java index 989f760a8136..c009c5673989 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java @@ -25,7 +25,6 @@ import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.TestPipeline; From 3e51cd8cfc9f33fd0ab13c5173eb62e2627c8427 Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Mon, 24 Nov 2025 10:04:33 -0800 Subject: [PATCH 10/11] Remove Kafka test dependencies for Dataflow worker --- runners/google-cloud-dataflow-java/build.gradle | 2 -- 1 file changed, 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 3e5ff2637650..9f064f2432bc 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -129,8 +129,6 @@ dependencies { testImplementation library.java.google_cloud_dataflow_java_proto_library_all testImplementation library.java.jackson_dataformat_yaml testImplementation library.java.mockito_inline - testImplementation project(":sdks:java:io:kafka") - testImplementation library.java.kafka_clients validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: project.path, configuration: "testRuntimeMigration") validatesRunner library.java.hamcrest From 368c4c1209ea29f75109f7482b1c8dd7b0466a1f Mon Sep 17 00:00:00 2001 From: Tom Stepp Date: Mon, 24 Nov 2025 11:24:50 -0800 Subject: [PATCH 11/11] Ignore abandoned nodes in the test since we just need to replace the transforms. --- .../beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java index c009c5673989..4301aa92ec8f 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java @@ -128,5 +128,6 @@ public void leaveCompositeTransform(Node node) { } }; p.traverseTopologically(visitor); + p.enableAbandonedNodeEnforcement(false); } }