Skip to content

Commit 5f4bcbe

Browse files
committed
Apply offset consumer config overrides to admin client config
1 parent 0d06b5e commit 5f4bcbe

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.math.MathContext;
2525
import java.time.Duration;
2626
import java.util.Collections;
27+
import java.util.HashMap;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Optional;
@@ -200,6 +201,15 @@ private ReadFromKafkaDoFn(
200201
transform.getAdminFactoryFn();
201202
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn =
202203
transform.getConsumerFactoryFn();
204+
final @Nullable Map<String, Object> offsetConsumerConfigOverrides =
205+
transform.getOffsetConsumerConfig();
206+
final Map<String, Object> offsetConsumerConfig;
207+
if (offsetConsumerConfigOverrides == null) {
208+
offsetConsumerConfig = transform.getConsumerConfig();
209+
} else {
210+
offsetConsumerConfig = new HashMap<>(transform.getConsumerConfig());
211+
offsetConsumerConfig.putAll(offsetConsumerConfigOverrides);
212+
}
203213
this.consumerConfig = transform.getConsumerConfig();
204214
this.keyDeserializerProvider =
205215
Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider());
@@ -250,7 +260,7 @@ public KafkaLatestOffsetEstimator load(
250260
sourceDescriptor);
251261
final Map<String, Object> config =
252262
KafkaIOUtils.overrideBootstrapServersConfig(
253-
consumerConfig, sourceDescriptor);
263+
offsetConsumerConfig, sourceDescriptor);
254264
final Admin admin = adminFactoryFn.apply(config);
255265
return new KafkaLatestOffsetEstimator(
256266
admin, sourceDescriptor.getTopicPartition());

0 commit comments

Comments
 (0)