Skip to content

Commit b1224f8

Browse files
committed
Revert "Apply offset consumer config overrides to admin client config"
This reverts commit 5f4bcbe.
1 parent 5f4bcbe commit b1224f8

File tree

1 file changed

+1
-11
lines changed

1 file changed

+1
-11
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.math.MathContext;
2525
import java.time.Duration;
2626
import java.util.Collections;
27-
import java.util.HashMap;
2827
import java.util.List;
2928
import java.util.Map;
3029
import java.util.Optional;
@@ -201,15 +200,6 @@ private ReadFromKafkaDoFn(
201200
transform.getAdminFactoryFn();
202201
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn =
203202
transform.getConsumerFactoryFn();
204-
final @Nullable Map<String, Object> offsetConsumerConfigOverrides =
205-
transform.getOffsetConsumerConfig();
206-
final Map<String, Object> offsetConsumerConfig;
207-
if (offsetConsumerConfigOverrides == null) {
208-
offsetConsumerConfig = transform.getConsumerConfig();
209-
} else {
210-
offsetConsumerConfig = new HashMap<>(transform.getConsumerConfig());
211-
offsetConsumerConfig.putAll(offsetConsumerConfigOverrides);
212-
}
213203
this.consumerConfig = transform.getConsumerConfig();
214204
this.keyDeserializerProvider =
215205
Preconditions.checkArgumentNotNull(transform.getKeyDeserializerProvider());
@@ -260,7 +250,7 @@ public KafkaLatestOffsetEstimator load(
260250
sourceDescriptor);
261251
final Map<String, Object> config =
262252
KafkaIOUtils.overrideBootstrapServersConfig(
263-
offsetConsumerConfig, sourceDescriptor);
253+
consumerConfig, sourceDescriptor);
264254
final Admin admin = adminFactoryFn.apply(config);
265255
return new KafkaLatestOffsetEstimator(
266256
admin, sourceDescriptor.getTopicPartition());

0 commit comments

Comments
 (0)