Skip to content

Commit 6a82448

Browse files
authored
Move Kafka read with redistribute override to Kafka IO package. (#36887)
* Add kafka read override to Dataflow java runner. * Fix spot bugs (spacing) * Add unit test of redistribute override * Update test dependencies via gradle * Add logic and test case for explicitly disabled. * Add explicitly enabled test case * Use boolean asserts over assertThat, assert each read is visited only once, refine suppressed lint warnings to just instanceof on matches method. * Move kafka read with redistribute override to Kafka IO package. * Lint fixes * Remove Kafka test dependencies for Dataflow worker * Ignore abandoned nodes in the test since we just need to replace the transforms.
1 parent f9f13c3 commit 6a82448

File tree

5 files changed

+13
-13
lines changed

5 files changed

+13
-13
lines changed

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,6 @@ dependencies {
129129
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
130130
testImplementation library.java.jackson_dataformat_yaml
131131
testImplementation library.java.mockito_inline
132-
testImplementation project(":sdks:java:io:kafka")
133-
testImplementation library.java.kafka_clients
134132
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
135133
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
136134
validatesRunner library.java.hamcrest

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -659,10 +659,7 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
659659

660660
try {
661661
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
662-
overridesBuilder.add(
663-
PTransformOverride.of(
664-
KafkaReadWithRedistributeOverride.matcher(),
665-
new KafkaReadWithRedistributeOverride.Factory()));
662+
overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE);
666663
} catch (NoClassDefFoundError e) {
667664
// Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java
668665
// and only needed when KafkaIO is used in the pipeline.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,6 +1817,13 @@ private boolean runnerPrefersLegacyRead(PipelineOptions options) {
18171817
return true;
18181818
}
18191819

1820+
/** A {@link PTransformOverride} for runners to override redistributed Kafka Read transforms. */
1821+
@Internal
1822+
public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE =
1823+
PTransformOverride.of(
1824+
KafkaReadWithRedistributeOverride.matcher(),
1825+
new KafkaReadWithRedistributeOverride.Factory<>());
1826+
18201827
/**
18211828
* A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka
18221829
* read if runners doesn't have a good support on executing unbounded Splittable DoFn.

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java renamed to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.runners.dataflow;
18+
package org.apache.beam.sdk.io.kafka;
1919

2020
import java.util.Map;
21-
import org.apache.beam.sdk.io.kafka.KafkaIO;
22-
import org.apache.beam.sdk.io.kafka.KafkaRecord;
2321
import org.apache.beam.sdk.runners.AppliedPTransform;
2422
import org.apache.beam.sdk.runners.PTransformMatcher;
2523
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -48,8 +46,8 @@ public boolean matches(AppliedPTransform<?, ?, ?> application) {
4846
}
4947

5048
/**
51-
* {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code
52-
* withOffsetDeduplication} when {@code withRedistribute} is enabled.
49+
* {@link PTransformOverrideFactory} for {@link org.apache.beam.sdk.io.kafka.KafkaIO.Read} that
50+
* enables {@code withOffsetDeduplication} when {@code withRedistribute} is enabled.
5351
*/
5452
static class Factory<K, V>
5553
implements PTransformOverrideFactory<

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java renamed to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.beam.runners.dataflow;
18+
package org.apache.beam.sdk.io.kafka;
1919

2020
import static org.hamcrest.MatcherAssert.assertThat;
2121
import static org.hamcrest.Matchers.nullValue;
@@ -25,7 +25,6 @@
2525
import java.io.Serializable;
2626
import java.util.Collections;
2727
import org.apache.beam.sdk.Pipeline;
28-
import org.apache.beam.sdk.io.kafka.KafkaIO;
2928
import org.apache.beam.sdk.runners.PTransformOverride;
3029
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
3130
import org.apache.beam.sdk.testing.TestPipeline;
@@ -129,5 +128,6 @@ public void leaveCompositeTransform(Node node) {
129128
}
130129
};
131130
p.traverseTopologically(visitor);
131+
p.enableAbandonedNodeEnforcement(false);
132132
}
133133
}

0 commit comments

Comments
 (0)