diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 3e5ff2637650..9f064f2432bc 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -129,8 +129,6 @@ dependencies { testImplementation library.java.google_cloud_dataflow_java_proto_library_all testImplementation library.java.jackson_dataformat_yaml testImplementation library.java.mockito_inline - testImplementation project(":sdks:java:io:kafka") - testImplementation library.java.kafka_clients validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest") validatesRunner project(path: project.path, configuration: "testRuntimeMigration") validatesRunner library.java.hamcrest diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7d0a151b48b9..775e7b91de93 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -659,10 +659,7 @@ private List getOverrides(boolean streaming) { try { overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE); - overridesBuilder.add( - PTransformOverride.of( - KafkaReadWithRedistributeOverride.matcher(), - new KafkaReadWithRedistributeOverride.Factory())); + overridesBuilder.add(KafkaIO.Read.KAFKA_REDISTRIBUTE_OVERRIDE); } catch (NoClassDefFoundError e) { // Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java // and only needed when KafkaIO is used in the pipeline. diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 48e4ae2317ac..ad5535517646 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1817,6 +1817,13 @@ private boolean runnerPrefersLegacyRead(PipelineOptions options) { return true; } + /** A {@link PTransformOverride} for runners to override redistributed Kafka Read transforms. */ + @Internal + public static final PTransformOverride KAFKA_REDISTRIBUTE_OVERRIDE = + PTransformOverride.of( + KafkaReadWithRedistributeOverride.matcher(), + new KafkaReadWithRedistributeOverride.Factory<>()); + /** * A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka * read if runners doesn't have a good support on executing unbounded Splittable DoFn. diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java similarity index 90% rename from runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java rename to sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java index 89f0eef9b8cc..f8ebaaed56b7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverride.java @@ -15,11 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow; +package org.apache.beam.sdk.io.kafka; import java.util.Map; -import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.io.kafka.KafkaRecord; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -48,8 +46,8 @@ public boolean matches(AppliedPTransform application) { } /** - * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code - * withOffsetDeduplication} when {@code withRedistribute} is enabled. + * {@link PTransformOverrideFactory} for {@link org.apache.beam.sdk.io.kafka.KafkaIO.Read} that + * enables {@code withOffsetDeduplication} when {@code withRedistribute} is enabled. */ static class Factory implements PTransformOverrideFactory< diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java similarity index 98% rename from runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java rename to sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java index 05e5dd6a55d1..4301aa92ec8f 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadWithRedistributeOverrideTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow; +package org.apache.beam.sdk.io.kafka; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.nullValue; @@ -25,7 +25,6 @@ import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.TestPipeline; @@ -129,5 +128,6 @@ public void leaveCompositeTransform(Node node) { } }; p.traverseTopologically(visitor); + p.enableAbandonedNodeEnforcement(false); } }