Skip to content

Commit 5a00d73

Browse files
committed
Partially revert "Replace end offset consumer with admin client"
This reverts commit 0d06b5e.
1 parent b1224f8 commit 5a00d73

File tree

8 files changed

+102
-307
lines changed

8 files changed

+102
-307
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@
112112
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
113113
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
114114
import org.apache.kafka.clients.CommonClientConfigs;
115-
import org.apache.kafka.clients.admin.Admin;
116115
import org.apache.kafka.clients.consumer.Consumer;
117116
import org.apache.kafka.clients.consumer.ConsumerConfig;
118117
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -603,7 +602,6 @@ public static <K, V> Read<K, V> read() {
603602
return new AutoValue_KafkaIO_Read.Builder<K, V>()
604603
.setTopics(new ArrayList<>())
605604
.setTopicPartitions(new ArrayList<>())
606-
.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN)
607605
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
608606
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
609607
.setMaxNumRecords(Long.MAX_VALUE)
@@ -695,9 +693,6 @@ public abstract static class Read<K, V>
695693
@Pure
696694
public abstract @Nullable Coder<V> getValueCoder();
697695

698-
@Pure
699-
public abstract SerializableFunction<Map<String, Object>, Admin> getAdminFactoryFn();
700-
701696
@Pure
702697
public abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
703698
getConsumerFactoryFn();
@@ -781,9 +776,6 @@ abstract static class Builder<K, V> {
781776

782777
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
783778

784-
abstract Builder<K, V> setAdminFactoryFn(
785-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn);
786-
787779
abstract Builder<K, V> setConsumerFactoryFn(
788780
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
789781

@@ -867,7 +859,6 @@ static <K, V> void setupExternalBuilder(
867859

868860
// Set required defaults
869861
builder.setTopicPartitions(Collections.emptyList());
870-
builder.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN);
871862
builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN);
872863
if (config.maxReadTime != null) {
873864
builder.setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
@@ -1275,15 +1266,6 @@ public Read<K, V> withValueDeserializerProviderAndCoder(
12751266
.build();
12761267
}
12771268

1278-
/**
1279-
* A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful
1280-
* for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}.
1281-
*/
1282-
public Read<K, V> withAdminFactoryFn(
1283-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn) {
1284-
return toBuilder().setAdminFactoryFn(adminFactoryFn).build();
1285-
}
1286-
12871269
/**
12881270
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
12891271
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
@@ -1934,7 +1916,6 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
19341916
ReadSourceDescriptors.<K, V>read()
19351917
.withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
19361918
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
1937-
.withAdminFactoryFn(kafkaRead.getAdminFactoryFn())
19381919
.withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
19391920
.withKeyDeserializerProviderAndCoder(
19401921
kafkaRead.getKeyDeserializerProvider(), keyCoder)
@@ -2451,9 +2432,6 @@ public abstract static class ReadSourceDescriptors<K, V>
24512432
@Pure
24522433
abstract @Nullable Coder<V> getValueCoder();
24532434

2454-
@Pure
2455-
abstract SerializableFunction<Map<String, Object>, Admin> getAdminFactoryFn();
2456-
24572435
@Pure
24582436
abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
24592437
getConsumerFactoryFn();
@@ -2504,9 +2482,6 @@ abstract static class Builder<K, V> {
25042482
abstract ReadSourceDescriptors.Builder<K, V> setOffsetConsumerConfig(
25052483
@Nullable Map<String, Object> offsetConsumerConfig);
25062484

2507-
abstract ReadSourceDescriptors.Builder<K, V> setAdminFactoryFn(
2508-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn);
2509-
25102485
abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
25112486
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
25122487

@@ -2561,7 +2536,6 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
25612536

25622537
public static <K, V> ReadSourceDescriptors<K, V> read() {
25632538
return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder<K, V>()
2564-
.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN)
25652539
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
25662540
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
25672541
.setCommitOffsetEnabled(false)
@@ -2662,15 +2636,6 @@ public ReadSourceDescriptors<K, V> withValueDeserializerProviderAndCoder(
26622636
.build();
26632637
}
26642638

2665-
/**
2666-
* A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful
2667-
* for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}.
2668-
*/
2669-
public ReadSourceDescriptors<K, V> withAdminFactoryFn(
2670-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn) {
2671-
return toBuilder().setAdminFactoryFn(adminFactoryFn).build();
2672-
}
2673-
26742639
/**
26752640
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
26762641
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ enum KafkaIOReadProperties {
8383
TOPIC_PATTERN,
8484
KEY_CODER,
8585
VALUE_CODER,
86-
ADMIN_FACTORY_FN,
8786
CONSUMER_FACTORY_FN,
8887
WATERMARK_FN(LEGACY),
8988
MAX_NUM_RECORDS(LEGACY) {

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.beam.sdk.transforms.SerializableFunction;
2929
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3030
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
31-
import org.apache.kafka.clients.admin.Admin;
3231
import org.apache.kafka.clients.consumer.Consumer;
3332
import org.apache.kafka.clients.consumer.ConsumerConfig;
3433
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -77,10 +76,6 @@ public final class KafkaIOUtils {
7776
// lets allow these, applications can have better resume point for restarts.
7877
);
7978

80-
// Default Kafka Admin supplier.
81-
static final SerializableFunction<Map<String, Object>, Admin> KAFKA_ADMIN_FACTORY_FN =
82-
Admin::create;
83-
8479
// default Kafka 0.9 Consumer supplier.
8580
static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
8681
KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,10 @@
6666
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
6767
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
6868
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
69-
import org.apache.kafka.clients.admin.Admin;
70-
import org.apache.kafka.clients.admin.ListOffsetsResult;
71-
import org.apache.kafka.clients.admin.OffsetSpec;
7269
import org.apache.kafka.clients.consumer.Consumer;
7370
import org.apache.kafka.clients.consumer.ConsumerConfig;
7471
import org.apache.kafka.clients.consumer.ConsumerRecord;
7572
import org.apache.kafka.clients.consumer.ConsumerRecords;
76-
import org.apache.kafka.common.KafkaFuture;
7773
import org.apache.kafka.common.PartitionInfo;
7874
import org.apache.kafka.common.TopicPartition;
7975
import org.apache.kafka.common.config.ConfigDef;
@@ -196,8 +192,6 @@ private static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
196192
private ReadFromKafkaDoFn(
197193
ReadSourceDescriptors<K, V> transform,
198194
TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> recordTag) {
199-
final SerializableFunction<Map<String, Object>, Admin> adminFactoryFn =
200-
transform.getAdminFactoryFn();
201195
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn =
202196
transform.getConsumerFactoryFn();
203197
this.consumerConfig = transform.getConsumerConfig();
@@ -246,14 +240,15 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor)
246240
public KafkaLatestOffsetEstimator load(
247241
final KafkaSourceDescriptor sourceDescriptor) {
248242
LOG.info(
249-
"Creating Kafka admin for offset estimation for {}",
243+
"Creating Kafka consumer for offset estimation for {}",
250244
sourceDescriptor);
251245
final Map<String, Object> config =
252246
KafkaIOUtils.overrideBootstrapServersConfig(
253247
consumerConfig, sourceDescriptor);
254-
final Admin admin = adminFactoryFn.apply(config);
248+
final Consumer<byte[], byte[]> consumer =
249+
consumerFactoryFn.apply(config);
255250
return new KafkaLatestOffsetEstimator(
256-
admin, sourceDescriptor.getTopicPartition());
251+
consumer, sourceDescriptor.getTopicPartition());
257252
}
258253
}));
259254
this.pollConsumerCacheSupplier =
@@ -350,43 +345,37 @@ public Consumer<byte[], byte[]> load(
350345
*/
351346
private static class KafkaLatestOffsetEstimator
352347
implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable {
353-
private static final ListOffsetsResult.ListOffsetsResultInfo DEFAULT_RESULT =
354-
new ListOffsetsResult.ListOffsetsResultInfo(
355-
Long.MIN_VALUE, Long.MIN_VALUE, Optional.empty());
356-
357-
private final Admin admin;
358-
private final Supplier<KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>
359-
latestOffsetFutureSupplier;
360-
private ListOffsetsResult.ListOffsetsResultInfo latestOffsetResult;
361-
362-
KafkaLatestOffsetEstimator(final Admin admin, final TopicPartition topicPartition) {
363-
this.admin = admin;
364-
this.latestOffsetFutureSupplier =
348+
private final Consumer<byte[], byte[]> offsetConsumer;
349+
private final Supplier<Long> offsetSupplier;
350+
351+
KafkaLatestOffsetEstimator(
352+
final Consumer<byte[], byte[]> offsetConsumer, final TopicPartition topicPartition) {
353+
this.offsetConsumer = offsetConsumer;
354+
this.offsetSupplier =
365355
new ExpiringMemoizingSerializableSupplier<>(
366-
() ->
367-
admin
368-
.listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest()))
369-
.partitionResult(topicPartition),
356+
() -> {
357+
try {
358+
return offsetConsumer
359+
.endOffsets(Collections.singleton(topicPartition))
360+
.getOrDefault(topicPartition, Long.MIN_VALUE);
361+
} catch (Throwable t) {
362+
LOG.error("Failed to get end offset for {}", topicPartition, t);
363+
return Long.MIN_VALUE;
364+
}
365+
},
370366
Duration.ofSeconds(1),
371-
KafkaFuture.completedFuture(DEFAULT_RESULT),
367+
Long.MIN_VALUE,
372368
Duration.ZERO);
373-
this.latestOffsetResult = DEFAULT_RESULT;
374369
}
375370

376371
@Override
377372
public long estimate() {
378-
try {
379-
latestOffsetResult = latestOffsetFutureSupplier.get().getNow(latestOffsetResult);
380-
} catch (Throwable t) {
381-
LOG.error("Failed to get latest offset", t);
382-
}
383-
384-
return latestOffsetResult.offset();
373+
return offsetSupplier.get();
385374
}
386375

387376
@Override
388377
public void close() {
389-
admin.close();
378+
offsetConsumer.close();
390379
}
391380
}
392381

0 commit comments

Comments
 (0)