Skip to content

Commit 9628c1e

Browse files
authored
[KafkaIO] Improve caching in backlog estimation and processing (#34331)
* Improve caching in backlog estimation and processing * Add comment to explain the behavior of volatile guard field in KafkaLatestOffsetEstimator * Guard against exceptions in endOffset refresh * Call cancelIfTimeouted in roundtripElements to shutdown lingering pipelines * Add missing calls to seek and/or pause before return points added in #34202
1 parent 363bcc0 commit 9628c1e

File tree

4 files changed

+422
-322
lines changed

4 files changed

+422
-322
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.kafka;
1919

2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2122

2223
import java.nio.charset.StandardCharsets;
2324
import java.util.HashMap;
@@ -130,6 +131,20 @@ static Map<String, Object> getOffsetConsumerConfig(
130131
return offsetConsumerConfig;
131132
}
132133

134+
static Map<String, Object> overrideBootstrapServersConfig(
135+
Map<String, Object> currentConfig, KafkaSourceDescriptor description) {
136+
checkState(
137+
currentConfig.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)
138+
|| description.getBootStrapServers() != null);
139+
Map<String, Object> config = new HashMap<>(currentConfig);
140+
if (description.getBootStrapServers() != null && description.getBootStrapServers().size() > 0) {
141+
config.put(
142+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
143+
String.join(",", description.getBootStrapServers()));
144+
}
145+
return config;
146+
}
147+
133148
/*
134149
* Maintains approximate average over last 1000 elements.
135150
* Usage is only thread-safe for a single producer and multiple consumers.

0 commit comments

Comments
 (0)