Skip to content

Commit 1a52da3

Browse files
committed
Add explicitly enabled test case
1 parent 1cdd6bf commit 1a52da3

File tree

1 file changed

+21
-3
lines changed

1 file changed

+21
-3
lines changed

runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,19 @@ public void testOverrideAppliedWhenRedistributeEnabled() {
5959
"ExplicitlyDisable",
6060
KafkaIO.<String, String>read()
6161
.withBootstrapServers("localhost:9092")
62-
.withTopic("test_disable")
62+
.withTopic("test_disabled")
6363
.withKeyDeserializer(StringDeserializer.class)
6464
.withValueDeserializer(StringDeserializer.class)
6565
.withOffsetDeduplication(false));
66+
p.apply(
67+
"ExplicitlyEnable",
68+
KafkaIO.<String, String>read()
69+
.withBootstrapServers("localhost:9092")
70+
.withTopic("test_enabled")
71+
.withKeyDeserializer(StringDeserializer.class)
72+
.withValueDeserializer(StringDeserializer.class)
73+
.withRedistribute()
74+
.withOffsetDeduplication(true));
6675

6776
p.replaceAll(
6877
Collections.singletonList(
@@ -76,6 +85,7 @@ public void testOverrideAppliedWhenRedistributeEnabled() {
7685
private boolean matchingVisited = false;
7786
private boolean noRedistributeVisited = false;
7887
private boolean explicitlyDisabledVisited = false;
88+
private boolean explicitlyEnabledVisited = false;
7989

8090
@Override
8191
public CompositeBehavior enterCompositeTransform(Node node) {
@@ -89,10 +99,14 @@ public CompositeBehavior enterCompositeTransform(Node node) {
8999
assertThat(read.isRedistributed(), is(false));
90100
assertThat(read.getOffsetDeduplication(), nullValue());
91101
noRedistributeVisited = true;
92-
} else if (read.getTopics().contains("test_disable")) {
102+
} else if (read.getTopics().contains("test_disabled")) {
93103
assertThat(read.isRedistributed(), is(false));
94104
assertThat(read.getOffsetDeduplication(), is(false));
95105
explicitlyDisabledVisited = true;
106+
} else if (read.getTopics().contains("test_enabled")) {
107+
assertThat(read.isRedistributed(), is(true));
108+
assertThat(read.getOffsetDeduplication(), is(true));
109+
explicitlyEnabledVisited = true;
96110
}
97111
}
98112
return CompositeBehavior.ENTER_TRANSFORM;
@@ -105,9 +119,13 @@ public void leaveCompositeTransform(Node node) {
105119
assertThat(
106120
"No redistribute transform was not visited", noRedistributeVisited, is(true));
107121
assertThat(
108-
"Explicitly disable transform was not visited",
122+
"Explicitly disabled transform was not visited",
109123
explicitlyDisabledVisited,
110124
is(true));
125+
assertThat(
126+
"Explicitly enabled transform was not visited",
127+
explicitlyEnabledVisited,
128+
is(true));
111129
}
112130
}
113131
};

0 commit comments

Comments
 (0)