Skip to content

Commit b652544

Browse files
committed
Partially revert "Replace end offset consumer with admin client"
This reverts commit 0d06b5e.
1 parent 5b14dfd commit b652544

File tree

8 files changed

+102
-307
lines changed

8 files changed

+102
-307
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@
114114
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
115115
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
116116
import org.apache.kafka.clients.CommonClientConfigs;
117-
import org.apache.kafka.clients.admin.Admin;
118117
import org.apache.kafka.clients.consumer.Consumer;
119118
import org.apache.kafka.clients.consumer.ConsumerConfig;
120119
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -605,7 +604,6 @@ public static <K, V> Read<K, V> read() {
605604
return new AutoValue_KafkaIO_Read.Builder<K, V>()
606605
.setTopics(new ArrayList<>())
607606
.setTopicPartitions(new ArrayList<>())
608-
.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN)
609607
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
610608
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
611609
.setMaxNumRecords(Long.MAX_VALUE)
@@ -697,9 +695,6 @@ public abstract static class Read<K, V>
697695
@Pure
698696
public abstract @Nullable Coder<V> getValueCoder();
699697

700-
@Pure
701-
public abstract SerializableFunction<Map<String, Object>, Admin> getAdminFactoryFn();
702-
703698
@Pure
704699
public abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
705700
getConsumerFactoryFn();
@@ -783,9 +778,6 @@ abstract static class Builder<K, V> {
783778

784779
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
785780

786-
abstract Builder<K, V> setAdminFactoryFn(
787-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn);
788-
789781
abstract Builder<K, V> setConsumerFactoryFn(
790782
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
791783

@@ -869,7 +861,6 @@ static <K, V> void setupExternalBuilder(
869861

870862
// Set required defaults
871863
builder.setTopicPartitions(Collections.emptyList());
872-
builder.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN);
873864
builder.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN);
874865
if (config.maxReadTime != null) {
875866
builder.setMaxReadTime(Duration.standardSeconds(config.maxReadTime));
@@ -1315,15 +1306,6 @@ public Read<K, V> withValueDeserializerProviderAndCoder(
13151306
.build();
13161307
}
13171308

1318-
/**
1319-
* A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful
1320-
* for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}.
1321-
*/
1322-
public Read<K, V> withAdminFactoryFn(
1323-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn) {
1324-
return toBuilder().setAdminFactoryFn(adminFactoryFn).build();
1325-
}
1326-
13271309
/**
13281310
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
13291311
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.
@@ -1981,7 +1963,6 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
19811963
ReadSourceDescriptors.<K, V>read()
19821964
.withConsumerConfigOverrides(kafkaRead.getConsumerConfig())
19831965
.withOffsetConsumerConfigOverrides(kafkaRead.getOffsetConsumerConfig())
1984-
.withAdminFactoryFn(kafkaRead.getAdminFactoryFn())
19851966
.withConsumerFactoryFn(kafkaRead.getConsumerFactoryFn())
19861967
.withKeyDeserializerProviderAndCoder(
19871968
kafkaRead.getKeyDeserializerProvider(), keyCoder)
@@ -2498,9 +2479,6 @@ public abstract static class ReadSourceDescriptors<K, V>
24982479
@Pure
24992480
abstract @Nullable Coder<V> getValueCoder();
25002481

2501-
@Pure
2502-
abstract SerializableFunction<Map<String, Object>, Admin> getAdminFactoryFn();
2503-
25042482
@Pure
25052483
abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
25062484
getConsumerFactoryFn();
@@ -2551,9 +2529,6 @@ abstract static class Builder<K, V> {
25512529
abstract ReadSourceDescriptors.Builder<K, V> setOffsetConsumerConfig(
25522530
@Nullable Map<String, Object> offsetConsumerConfig);
25532531

2554-
abstract ReadSourceDescriptors.Builder<K, V> setAdminFactoryFn(
2555-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn);
2556-
25572532
abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
25582533
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
25592534

@@ -2608,7 +2583,6 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
26082583

26092584
public static <K, V> ReadSourceDescriptors<K, V> read() {
26102585
return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder<K, V>()
2611-
.setAdminFactoryFn(KafkaIOUtils.KAFKA_ADMIN_FACTORY_FN)
26122586
.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
26132587
.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
26142588
.setCommitOffsetEnabled(false)
@@ -2709,15 +2683,6 @@ public ReadSourceDescriptors<K, V> withValueDeserializerProviderAndCoder(
27092683
.build();
27102684
}
27112685

2712-
/**
2713-
* A factory to create Kafka {@link Admin} from offset consumer configuration. This is useful
2714-
* for supporting another version of Kafka admin. Default is {@link Admin#create(Map)}.
2715-
*/
2716-
public ReadSourceDescriptors<K, V> withAdminFactoryFn(
2717-
SerializableFunction<Map<String, Object>, Admin> adminFactoryFn) {
2718-
return toBuilder().setAdminFactoryFn(adminFactoryFn).build();
2719-
}
2720-
27212686
/**
27222687
* A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for
27232688
* supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ enum KafkaIOReadProperties {
8383
TOPIC_PATTERN,
8484
KEY_CODER,
8585
VALUE_CODER,
86-
ADMIN_FACTORY_FN,
8786
CONSUMER_FACTORY_FN,
8887
WATERMARK_FN(LEGACY),
8988
MAX_NUM_RECORDS(LEGACY) {

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.beam.sdk.transforms.SerializableFunction;
2929
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
3030
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
31-
import org.apache.kafka.clients.admin.Admin;
3231
import org.apache.kafka.clients.consumer.Consumer;
3332
import org.apache.kafka.clients.consumer.ConsumerConfig;
3433
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -77,10 +76,6 @@ public final class KafkaIOUtils {
7776
// lets allow these, applications can have better resume point for restarts.
7877
);
7978

80-
// Default Kafka Admin supplier.
81-
static final SerializableFunction<Map<String, Object>, Admin> KAFKA_ADMIN_FACTORY_FN =
82-
Admin::create;
83-
8479
// default Kafka 0.9 Consumer supplier.
8580
static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
8681
KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,10 @@
6767
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
6868
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
6969
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
70-
import org.apache.kafka.clients.admin.Admin;
71-
import org.apache.kafka.clients.admin.ListOffsetsResult;
72-
import org.apache.kafka.clients.admin.OffsetSpec;
7370
import org.apache.kafka.clients.consumer.Consumer;
7471
import org.apache.kafka.clients.consumer.ConsumerConfig;
7572
import org.apache.kafka.clients.consumer.ConsumerRecord;
7673
import org.apache.kafka.clients.consumer.ConsumerRecords;
77-
import org.apache.kafka.common.KafkaFuture;
7874
import org.apache.kafka.common.PartitionInfo;
7975
import org.apache.kafka.common.TopicPartition;
8076
import org.apache.kafka.common.config.ConfigDef;
@@ -206,8 +202,6 @@ private static class Bounded<K, V> extends ReadFromKafkaDoFn<K, V> {
206202
private ReadFromKafkaDoFn(
207203
ReadSourceDescriptors<K, V> transform,
208204
TupleTag<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>> recordTag) {
209-
final SerializableFunction<Map<String, Object>, Admin> adminFactoryFn =
210-
transform.getAdminFactoryFn();
211205
final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn =
212206
transform.getConsumerFactoryFn();
213207
this.consumerConfig = transform.getConsumerConfig();
@@ -256,14 +250,15 @@ public MovingAvg load(KafkaSourceDescriptor kafkaSourceDescriptor)
256250
public KafkaLatestOffsetEstimator load(
257251
final KafkaSourceDescriptor sourceDescriptor) {
258252
LOG.info(
259-
"Creating Kafka admin for offset estimation for {}",
253+
"Creating Kafka consumer for offset estimation for {}",
260254
sourceDescriptor);
261255
final Map<String, Object> config =
262256
KafkaIOUtils.overrideBootstrapServersConfig(
263257
consumerConfig, sourceDescriptor);
264-
final Admin admin = adminFactoryFn.apply(config);
258+
final Consumer<byte[], byte[]> consumer =
259+
consumerFactoryFn.apply(config);
265260
return new KafkaLatestOffsetEstimator(
266-
admin, sourceDescriptor.getTopicPartition());
261+
consumer, sourceDescriptor.getTopicPartition());
267262
}
268263
}));
269264
this.pollConsumerCacheSupplier =
@@ -360,43 +355,37 @@ public Consumer<byte[], byte[]> load(
360355
*/
361356
private static class KafkaLatestOffsetEstimator
362357
implements GrowableOffsetRangeTracker.RangeEndEstimator, Closeable {
363-
private static final ListOffsetsResult.ListOffsetsResultInfo DEFAULT_RESULT =
364-
new ListOffsetsResult.ListOffsetsResultInfo(
365-
Long.MIN_VALUE, Long.MIN_VALUE, Optional.empty());
366-
367-
private final Admin admin;
368-
private final Supplier<KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>
369-
latestOffsetFutureSupplier;
370-
private ListOffsetsResult.ListOffsetsResultInfo latestOffsetResult;
371-
372-
KafkaLatestOffsetEstimator(final Admin admin, final TopicPartition topicPartition) {
373-
this.admin = admin;
374-
this.latestOffsetFutureSupplier =
358+
private final Consumer<byte[], byte[]> offsetConsumer;
359+
private final Supplier<Long> offsetSupplier;
360+
361+
KafkaLatestOffsetEstimator(
362+
final Consumer<byte[], byte[]> offsetConsumer, final TopicPartition topicPartition) {
363+
this.offsetConsumer = offsetConsumer;
364+
this.offsetSupplier =
375365
new ExpiringMemoizingSerializableSupplier<>(
376-
() ->
377-
admin
378-
.listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest()))
379-
.partitionResult(topicPartition),
366+
() -> {
367+
try {
368+
return offsetConsumer
369+
.endOffsets(Collections.singleton(topicPartition))
370+
.getOrDefault(topicPartition, Long.MIN_VALUE);
371+
} catch (Throwable t) {
372+
LOG.error("Failed to get end offset for {}", topicPartition, t);
373+
return Long.MIN_VALUE;
374+
}
375+
},
380376
Duration.ofSeconds(1),
381-
KafkaFuture.completedFuture(DEFAULT_RESULT),
377+
Long.MIN_VALUE,
382378
Duration.ZERO);
383-
this.latestOffsetResult = DEFAULT_RESULT;
384379
}
385380

386381
@Override
387382
public long estimate() {
388-
try {
389-
latestOffsetResult = latestOffsetFutureSupplier.get().getNow(latestOffsetResult);
390-
} catch (Throwable t) {
391-
LOG.error("Failed to get latest offset", t);
392-
}
393-
394-
return latestOffsetResult.offset();
383+
return offsetSupplier.get();
395384
}
396385

397386
@Override
398387
public void close() {
399-
admin.close();
388+
offsetConsumer.close();
400389
}
401390
}
402391

0 commit comments

Comments
 (0)