Skip to content

Commit 89ae276

Browse files
authored
Fix: Dedup input list of Topics/TopicPartitions in KafkaIO (#35758)
1 parent 30dc617 commit 89ae276

File tree

3 files changed

+82
-13
lines changed

3 files changed

+82
-13
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,6 +1583,8 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
15831583
final KafkaIOReadImplementationCompatibilityResult compatibility =
15841584
KafkaIOReadImplementationCompatibility.getCompatibility(this);
15851585

1586+
Read<K, V> kafkaRead = deduplicateTopics(this);
1587+
15861588
// For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
15871589
// Kafka source, for example,
15881590
// * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
@@ -1599,9 +1601,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
15991601
|| compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
16001602
|| (compatibility.supports(KafkaIOReadImplementation.LEGACY)
16011603
&& runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
1602-
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
1604+
return input.apply(new ReadFromKafkaViaUnbounded<>(kafkaRead, keyCoder, valueCoder));
16031605
}
1604-
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
1606+
return input.apply(new ReadFromKafkaViaSDF<>(kafkaRead, keyCoder, valueCoder));
16051607
}
16061608

16071609
private void checkRedistributeConfiguration() {
@@ -1648,6 +1650,29 @@ private void warnAboutUnsafeConfigurations(PBegin input) {
16481650
}
16491651
}
16501652

1653+
private Read<K, V> deduplicateTopics(Read<K, V> kafkaRead) {
1654+
final List<String> topics = getTopics();
1655+
if (topics != null && !topics.isEmpty()) {
1656+
final List<String> distinctTopics = topics.stream().distinct().collect(Collectors.toList());
1657+
if (topics.size() == distinctTopics.size()) {
1658+
return kafkaRead;
1659+
}
1660+
return kafkaRead.toBuilder().setTopics(distinctTopics).build();
1661+
}
1662+
1663+
final List<TopicPartition> topicPartitions = getTopicPartitions();
1664+
if (topicPartitions != null && !topicPartitions.isEmpty()) {
1665+
final List<TopicPartition> distinctTopicPartitions =
1666+
topicPartitions.stream().distinct().collect(Collectors.toList());
1667+
if (topicPartitions.size() == distinctTopicPartitions.size()) {
1668+
return kafkaRead;
1669+
}
1670+
return kafkaRead.toBuilder().setTopicPartitions(distinctTopicPartitions).build();
1671+
}
1672+
1673+
return kafkaRead;
1674+
}
1675+
16511676
// This class is designed to mimic the Flink pipeline options, so we can check for the
16521677
// checkpointingInterval property, but without needing to depend on the Flink runner
16531678
// Do not use this

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ private PipelineResult testReadTransformCreationWithImplementationBoundPropertie
116116
false, /*redistribute*/
117117
false, /*allowDuplicates*/
118118
0, /*numKeys*/
119-
null /*offsetDeduplication*/)));
119+
null, /*offsetDeduplication*/
120+
null /*topics*/)));
120121
return p.run();
121122
}
122123

sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,8 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
392392
false, /*redistribute*/
393393
false, /*allowDuplicates*/
394394
0, /*numKeys*/
395-
null /*offsetDeduplication*/);
395+
null, /*offsetDeduplication*/
396+
null /*topics*/);
396397
}
397398

398399
static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithOffsetDedup(
@@ -404,7 +405,23 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithOffsetDedup(
404405
true, /*redistribute*/
405406
false, /*allowDuplicates*/
406407
100, /*numKeys*/
407-
true /*offsetDeduplication*/);
408+
true, /*offsetDeduplication*/
409+
null /*topics*/);
410+
}
411+
412+
static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithTopics(
413+
int numElements,
414+
@Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn,
415+
List<String> topics) {
416+
return mkKafkaReadTransform(
417+
numElements,
418+
numElements,
419+
timestampFn,
420+
false, /*redistribute*/
421+
false, /*allowDuplicates*/
422+
0, /*numKeys*/
423+
null, /*offsetDeduplication*/
424+
topics /*topics*/);
408425
}
409426

410427
/**
@@ -418,15 +435,21 @@ static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
418435
@Nullable Boolean redistribute,
419436
@Nullable Boolean withAllowDuplicates,
420437
@Nullable Integer numKeys,
421-
@Nullable Boolean offsetDeduplication) {
438+
@Nullable Boolean offsetDeduplication,
439+
@Nullable List<String> topics) {
422440

423441
KafkaIO.Read<Integer, Long> reader =
424442
KafkaIO.<Integer, Long>read()
425443
.withBootstrapServers(mkKafkaServers)
426-
.withTopics(mkKafkaTopics)
444+
.withTopics(topics != null ? topics : mkKafkaTopics)
427445
.withConsumerFactoryFn(
428446
new ConsumerFactoryFn(
429-
mkKafkaTopics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
447+
topics != null
448+
? topics.stream().distinct().collect(Collectors.toList())
449+
: mkKafkaTopics,
450+
10,
451+
numElements,
452+
OffsetResetStrategy.EARLIEST)) // 20 partitions
430453
.withKeyDeserializer(IntegerDeserializer.class)
431454
.withValueDeserializer(LongDeserializer.class);
432455
if (maxNumRecords != null) {
@@ -648,6 +671,21 @@ public void testUnboundedSource() {
648671
p.run();
649672
}
650673

674+
@Test
675+
public void testUnboundedSourceWithDuplicateTopics() {
676+
int numElements = 1000;
677+
List<String> topics = ImmutableList.of("topic_a", "topic_b", "topic_a");
678+
679+
PCollection<Long> input =
680+
p.apply(
681+
mkKafkaReadTransformWithTopics(numElements, new ValueAsTimestampFn(), topics)
682+
.withoutMetadata())
683+
.apply(Values.create());
684+
685+
addCountingAsserts(input, numElements);
686+
p.run();
687+
}
688+
651689
@Test
652690
public void testRiskyConfigurationWarnsProperly() {
653691
int numElements = 1000;
@@ -682,7 +720,8 @@ public void warningsWithAllowDuplicatesEnabledAndCommitOffsets() {
682720
true, /*redistribute*/
683721
true, /*allowDuplicates*/
684722
0, /*numKeys*/
685-
null /*offsetDeduplication*/)
723+
null, /*offsetDeduplication*/
724+
null /*topics*/)
686725
.commitOffsetsInFinalize()
687726
.withConsumerConfigUpdates(
688727
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
@@ -709,7 +748,8 @@ public void noWarningsWithNoAllowDuplicatesAndCommitOffsets() {
709748
true, /*redistribute*/
710749
false, /*allowDuplicates*/
711750
0, /*numKeys*/
712-
null /*offsetDeduplication*/)
751+
null, /*offsetDeduplication*/
752+
null /*topics*/)
713753
.commitOffsetsInFinalize()
714754
.withConsumerConfigUpdates(
715755
ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, "group_id"))
@@ -737,7 +777,8 @@ public void testNumKeysIgnoredWithRedistributeNotEnabled() {
737777
false, /*redistribute*/
738778
false, /*allowDuplicates*/
739779
0, /*numKeys*/
740-
null /*offsetDeduplication*/)
780+
null, /*offsetDeduplication*/
781+
null /*topics*/)
741782
.withRedistributeNumKeys(100)
742783
.commitOffsetsInFinalize()
743784
.withConsumerConfigUpdates(
@@ -2109,7 +2150,8 @@ public void testUnboundedSourceStartReadTime() {
21092150
false, /*redistribute*/
21102151
false, /*allowDuplicates*/
21112152
0, /*numKeys*/
2112-
null /*offsetDeduplication*/)
2153+
null, /*offsetDeduplication*/
2154+
null /*topics*/)
21132155
.withStartReadTime(new Instant(startTime))
21142156
.withoutMetadata())
21152157
.apply(Values.create());
@@ -2154,7 +2196,8 @@ public void testUnboundedSourceStartReadTimeException() {
21542196
false, /*redistribute*/
21552197
false, /*allowDuplicates*/
21562198
0, /*numKeys*/
2157-
null /*offsetDeduplication*/)
2199+
null, /*offsetDeduplication*/
2200+
null /*topics*/)
21582201
.withStartReadTime(new Instant(startTime))
21592202
.withoutMetadata())
21602203
.apply(Values.create());

0 commit comments

Comments
 (0)