Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ dependencies {
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_inline
testImplementation project(":sdks:java:io:kafka")
testImplementation library.java.kafka_clients
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
validatesRunner library.java.hamcrest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ private List<PTransformOverride> getOverrides(boolean streaming) {

try {
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
overridesBuilder.add(
PTransformOverride.of(
KafkaReadWithRedistributeOverride.matcher(),
new KafkaReadWithRedistributeOverride.Factory()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally move Override to KafkaIO as KafkaIO.Read.KAFKA_READ_OVERRIDE above. This keeps reference to KafkaIO for DataflowRunner minimum (ease import)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry about that. @tomstepp can you send a follow-up PR to move this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, yeah I can update this, thanks for identifying the improvement @Abacn !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} catch (NoClassDefFoundError e) {
// Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java
// and only needed when KafkaIO is used in the pipeline.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"PatternMatchingInstanceof"
})
public final class KafkaReadWithRedistributeOverride {

private KafkaReadWithRedistributeOverride() {}

public static PTransformMatcher matcher() {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (application.getTransform() instanceof KafkaIO.Read) {
return ((KafkaIO.Read) application.getTransform()).isRedistributed();
}
return false;
}
};
}

/**
* {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code
* withOffsetDeduplication} when {@code withRedistribute} is enabled.
*/
static class Factory<K, V>
implements PTransformOverrideFactory<
PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> {

@Override
public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> transform) {
KafkaIO.Read<K, V> read = transform.getTransform();
if (read.getOffsetDeduplication() == null) {
return PTransformReplacement.of(
transform.getPipeline().begin(), read.withOffsetDeduplication(true));
}
return PTransformReplacement.of(transform.getPipeline().begin(), read);
}

@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, V>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class KafkaReadWithRedistributeOverrideTest implements Serializable {
@Rule public transient TestPipeline p = TestPipeline.create();

@Test
public void testOverrideAppliedWhenRedistributeEnabled() {
p.apply(
"MatchingRead",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_match")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withRedistribute());
p.apply(
"NoRedistribute",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_no_redistribute")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class));
p.apply(
"ExplicitlyDisable",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_disabled")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withOffsetDeduplication(false));
p.apply(
"ExplicitlyEnable",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_enabled")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withRedistribute()
.withOffsetDeduplication(true));

p.replaceAll(
Collections.singletonList(
PTransformOverride.of(
KafkaReadWithRedistributeOverride.matcher(),
new KafkaReadWithRedistributeOverride.Factory<>())));

Pipeline.PipelineVisitor visitor =
new Pipeline.PipelineVisitor.Defaults() {

private boolean matchingVisited = false;
private boolean noRedistributeVisited = false;
private boolean explicitlyDisabledVisited = false;
private boolean explicitlyEnabledVisited = false;

@Override
public CompositeBehavior enterCompositeTransform(Node node) {
if (node.getTransform() instanceof KafkaIO.Read) {
KafkaIO.Read<?, ?> read = (KafkaIO.Read<?, ?>) node.getTransform();
if (read.getTopics().contains("test_match")) {
assertThat(read.isRedistributed(), is(true));
assertThat(read.getOffsetDeduplication(), is(true));
matchingVisited = true;
} else if (read.getTopics().contains("test_no_redistribute")) {
assertThat(read.isRedistributed(), is(false));
assertThat(read.getOffsetDeduplication(), nullValue());
noRedistributeVisited = true;
} else if (read.getTopics().contains("test_disabled")) {
assertThat(read.isRedistributed(), is(false));
assertThat(read.getOffsetDeduplication(), is(false));
explicitlyDisabledVisited = true;
} else if (read.getTopics().contains("test_enabled")) {
assertThat(read.isRedistributed(), is(true));
assertThat(read.getOffsetDeduplication(), is(true));
explicitlyEnabledVisited = true;
}
}
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
public void leaveCompositeTransform(Node node) {
if (node.isRootNode()) {
assertThat("Matching transform was not visited", matchingVisited, is(true));
assertThat(
"No redistribute transform was not visited", noRedistributeVisited, is(true));
assertThat(
"Explicitly disabled transform was not visited",
explicitlyDisabledVisited,
is(true));
assertThat(
"Explicitly enabled transform was not visited",
explicitlyEnabledVisited,
is(true));
}
}
};
p.traverseTopologically(visitor);
}
}
Loading