Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ dependencies {
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.jackson_dataformat_yaml
testImplementation library.java.mockito_inline
testImplementation project(":sdks:java:io:kafka")
testImplementation library.java.kafka_clients
validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
validatesRunner library.java.hamcrest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ private List<PTransformOverride> getOverrides(boolean streaming) {

try {
overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
overridesBuilder.add(
PTransformOverride.of(
KafkaReadWithRedistributeOverride.matcher(),
new KafkaReadWithRedistributeOverride.Factory()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally move Override to KafkaIO as KafkaIO.Read.KAFKA_READ_OVERRIDE above. This keeps reference to KafkaIO for DataflowRunner minimum (ease import)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry about that. @tomstepp can you send a follow-up PR to move this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, yeah I can update this, thanks for identifying the improvement @Abacn !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} catch (NoClassDefFoundError e) {
// Do nothing. io-kafka is an optional dependency of runners-google-cloud-dataflow-java
// and only needed when KafkaIO is used in the pipeline.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import java.util.Map;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

public final class KafkaReadWithRedistributeOverride {

private KafkaReadWithRedistributeOverride() {}

public static PTransformMatcher matcher() {
return new PTransformMatcher() {
@SuppressWarnings({
"PatternMatchingInstanceof" // For compiling on older Java versions.
})
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
if (application.getTransform() instanceof KafkaIO.Read) {
return ((KafkaIO.Read) application.getTransform()).isRedistributed();
}
return false;
}
};
}

/**
* {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables {@code
* withOffsetDeduplication} when {@code withRedistribute} is enabled.
*/
static class Factory<K, V>
implements PTransformOverrideFactory<
PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> {

@Override
public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> transform) {
KafkaIO.Read<K, V> read = transform.getTransform();
if (read.getOffsetDeduplication() == null) {
return PTransformReplacement.of(
transform.getPipeline().begin(), read.withOffsetDeduplication(true));
}
return PTransformReplacement.of(transform.getPipeline().begin(), read);
}

@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, V>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class KafkaReadWithRedistributeOverrideTest implements Serializable {
@Rule public transient TestPipeline p = TestPipeline.create();

@Test
public void testOverrideAppliedWhenRedistributeEnabled() {
p.apply(
"MatchingRead",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_match")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withRedistribute());
p.apply(
"NoRedistribute",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_no_redistribute")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class));
p.apply(
"ExplicitlyDisable",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_disabled")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withOffsetDeduplication(false));
p.apply(
"ExplicitlyEnable",
KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("test_enabled")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withRedistribute()
.withOffsetDeduplication(true));

p.replaceAll(
Collections.singletonList(
PTransformOverride.of(
KafkaReadWithRedistributeOverride.matcher(),
new KafkaReadWithRedistributeOverride.Factory<>())));

Pipeline.PipelineVisitor visitor =
new Pipeline.PipelineVisitor.Defaults() {

private boolean matchingVisited = false;
private boolean noRedistributeVisited = false;
private boolean explicitlyDisabledVisited = false;
private boolean explicitlyEnabledVisited = false;

@Override
public CompositeBehavior enterCompositeTransform(Node node) {
if (node.getTransform() instanceof KafkaIO.Read) {
KafkaIO.Read<?, ?> read = (KafkaIO.Read<?, ?>) node.getTransform();
if (read.getTopics().contains("test_match")) {
assertTrue(read.isRedistributed());
assertTrue(read.getOffsetDeduplication());
assertFalse(matchingVisited);
matchingVisited = true;
} else if (read.getTopics().contains("test_no_redistribute")) {
assertFalse(read.isRedistributed());
assertThat(read.getOffsetDeduplication(), nullValue());
assertFalse(noRedistributeVisited);
noRedistributeVisited = true;
} else if (read.getTopics().contains("test_disabled")) {
assertFalse(read.isRedistributed());
assertFalse(read.getOffsetDeduplication());
assertFalse(explicitlyDisabledVisited);
explicitlyDisabledVisited = true;
} else if (read.getTopics().contains("test_enabled")) {
assertTrue(read.isRedistributed());
assertTrue(read.getOffsetDeduplication());
assertFalse(explicitlyEnabledVisited);
explicitlyEnabledVisited = true;
}
}
return CompositeBehavior.ENTER_TRANSFORM;
}

@Override
public void leaveCompositeTransform(Node node) {
if (node.isRootNode()) {
assertTrue("Matching transform was not visited", matchingVisited);
assertTrue("No redistribute transform was not visited", noRedistributeVisited);
assertTrue(
"Explicitly disabled transform was not visited", explicitlyDisabledVisited);
assertTrue("Explicitly enabled transform was not visited", explicitlyEnabledVisited);
}
}
};
p.traverseTopologically(visitor);
}
}
Loading