Skip to content

Commit fd9ca40

Browse files
committed
Apply offset consumer config overrides to admin client config
1 parent 7412f58 commit fd9ca40

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.math.MathContext;
2525
import java.time.Duration;
2626
import java.util.Collections;
27+
import java.util.HashMap;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Optional;
@@ -210,6 +211,15 @@ private ReadFromKafkaDoFn(
210211
transform.getAdminFactoryFn();
211212
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn =
212213
transform.getConsumerFactoryFn();
214+
final @Nullable Map<String, Object> offsetConsumerConfigOverrides =
215+
transform.getOffsetConsumerConfig();
216+
final Map<String, Object> offsetConsumerConfig;
217+
if (offsetConsumerConfigOverrides == null) {
218+
offsetConsumerConfig = transform.getConsumerConfig();
219+
} else {
220+
offsetConsumerConfig = new HashMap<>(transform.getConsumerConfig());
221+
offsetConsumerConfig.putAll(offsetConsumerConfigOverrides);
222+
}
213223
this.consumerConfig = transform.getConsumerConfig();
214224
this.keyDeserializerProvider =
215225
Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider());
@@ -260,7 +270,7 @@ public KafkaLatestOffsetEstimator load(
260270
sourceDescriptor);
261271
final Map<String, Object> config =
262272
KafkaIOUtils.overrideBootstrapServersConfig(
263-
consumerConfig, sourceDescriptor);
273+
offsetConsumerConfig, sourceDescriptor);
264274
final Admin admin = adminFactoryFn.apply(config);
265275
return new KafkaLatestOffsetEstimator(
266276
admin, sourceDescriptor.getTopicPartition());

0 commit comments

Comments
 (0)