Skip to content

Commit 7e6a546

Browse files
committed
BE: Improved speed of consumer groups requets issue #1245
1 parent 4e7767b commit 7e6a546

File tree

4 files changed

+71
-30
lines changed

4 files changed

+71
-30
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ public class ClustersProperties {
4242

4343
CacheProperties cache = new CacheProperties();
4444

45+
AdminClient adminClient = new AdminClient();
46+
47+
@Data
48+
public static class AdminClient {
49+
Integer timeout;
50+
int describeConsumerGroupsPartitionSize = 50;
51+
int describeConsumerGroupsConcurrency = 4;
52+
int listConsumerGroupOffsetsPartitionSize = 50;
53+
int listConsumerGroupOffsetsConcurrency = 4;
54+
int getTopicsConfigPartitionSize = 200;
55+
int describeTopicsPartitionSize = 200;
56+
}
57+
4558
@Data
4659
public static class Cluster {
4760
@NotBlank(message = "field name for for cluster could not be blank")

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
2727

2828
private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
2929
private final int clientTimeout;
30+
private final ClustersProperties clustersProperties;
3031

3132
public AdminClientServiceImpl(ClustersProperties clustersProperties) {
33+
this.clustersProperties = clustersProperties;
3234
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
3335
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
3436
}
@@ -53,7 +55,9 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
5355
);
5456
return AdminClient.create(properties);
5557
}).subscribeOn(Schedulers.boundedElastic())
56-
.flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
58+
.flatMap(ac -> ReactiveAdminClient.create(ac, clustersProperties.getAdminClient())
59+
.doOnError(th -> ac.close())
60+
)
5761
.onErrorMap(th -> new IllegalStateException(
5862
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
5963
}

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.Properties;
23+
import java.util.Set;
24+
import java.util.function.Predicate;
2325
import java.util.function.ToIntFunction;
2426
import java.util.stream.Collectors;
2527
import java.util.stream.Stream;
@@ -67,27 +69,46 @@ private Mono<List<InternalConsumerGroup>> getConsumerGroups(
6769
public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
6870
String topic) {
6971
return adminClientService.get(cluster)
70-
// 1. getting topic's end offsets
7172
.flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false)
72-
.flatMap(endOffsets -> {
73-
var tps = new ArrayList<>(endOffsets.keySet());
74-
// 2. getting all consumer groups
75-
return describeConsumerGroups(ac)
76-
.flatMap((List<ConsumerGroupDescription> groups) -> {
77-
// 3. trying to find committed offsets for topic
78-
var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
79-
return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets ->
80-
groups.stream()
81-
// 4. keeping only groups that relates to topic
82-
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())))
83-
.map(g ->
84-
// 5. constructing results
85-
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets))
86-
.toList()
87-
);
88-
}
89-
);
90-
}));
73+
.flatMap(endOffsets ->
74+
describeConsumerGroups(ac).flatMap(groups ->
75+
filterConsumerGroups(ac, groups, topic, endOffsets)
76+
)
77+
)
78+
);
79+
}
80+
81+
private Mono<List<InternalTopicConsumerGroup>> filterConsumerGroups(
82+
ReactiveAdminClient ac,
83+
List<ConsumerGroupDescription> groups,
84+
String topic,
85+
Map<TopicPartition, Long> endOffsets) {
86+
List<TopicPartition> partitions = new ArrayList<>(endOffsets.keySet());
87+
88+
Set<ConsumerGroupState> inactiveStates = Set.of(
89+
ConsumerGroupState.DEAD,
90+
ConsumerGroupState.EMPTY
91+
);
92+
93+
Map<Boolean, List<ConsumerGroupDescription>> partitioned = groups.stream().collect(
94+
Collectors.partitioningBy((g) -> !inactiveStates.contains(g.state()))
95+
);
96+
97+
List<ConsumerGroupDescription> stable = partitioned.get(true).stream()
98+
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, false))
99+
.toList();
100+
101+
List<ConsumerGroupDescription> filtered = new ArrayList<>();
102+
filtered.addAll(stable);
103+
filtered.addAll(partitioned.get(false));
104+
105+
List<String> groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList();
106+
return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets ->
107+
filtered.stream().filter(g ->
108+
isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId()))
109+
).map(g ->
110+
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets)
111+
).toList());
91112
}
92113

93114
private boolean isConsumerGroupRelatesToTopic(String topic,

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.google.common.collect.ImmutableTable;
1010
import com.google.common.collect.Iterables;
1111
import com.google.common.collect.Table;
12+
import io.kafbat.ui.config.ClustersProperties;
1213
import io.kafbat.ui.exception.IllegalEntityStateException;
1314
import io.kafbat.ui.exception.NotFoundException;
1415
import io.kafbat.ui.exception.ValidationException;
@@ -88,7 +89,6 @@
8889
import org.apache.kafka.common.quota.ClientQuotaAlteration;
8990
import org.apache.kafka.common.quota.ClientQuotaEntity;
9091
import org.apache.kafka.common.quota.ClientQuotaFilter;
91-
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
9292
import org.apache.kafka.common.resource.ResourcePatternFilter;
9393
import reactor.core.publisher.Flux;
9494
import reactor.core.publisher.Mono;
@@ -190,9 +190,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
190190
}
191191
}
192192

193-
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
193+
public static Mono<ReactiveAdminClient> create(AdminClient adminClient, ClustersProperties.AdminClient properties) {
194194
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
195-
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
195+
return configRelatedInfoMono.map(info ->
196+
new ReactiveAdminClient(adminClient, configRelatedInfoMono, properties, info)
197+
);
196198
}
197199

198200

@@ -235,6 +237,7 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
235237
@Getter(AccessLevel.PACKAGE) // visible for testing
236238
private final AdminClient client;
237239
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
240+
private final ClustersProperties.AdminClient properties;
238241

239242
private volatile ConfigRelatedInfo configRelatedInfo;
240243

@@ -280,7 +283,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> t
280283
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
281284
return partitionCalls(
282285
topicNames,
283-
200,
286+
properties.getGetTopicsConfigPartitionSize(),
284287
part -> getTopicsConfigImpl(part, includeDocFixed),
285288
mapMerger()
286289
);
@@ -348,7 +351,7 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
348351
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
349352
return partitionCalls(
350353
topics,
351-
200,
354+
properties.getDescribeTopicsPartitionSize(),
352355
this::describeTopicsImpl,
353356
mapMerger()
354357
);
@@ -517,8 +520,8 @@ public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
517520
public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
518521
return partitionCalls(
519522
groupIds,
520-
25,
521-
4,
523+
properties.getDescribeConsumerGroupsPartitionSize(),
524+
properties.getDescribeConsumerGroupsConcurrency(),
522525
ids -> toMono(client.describeConsumerGroups(ids).all()),
523526
mapMerger()
524527
);
@@ -541,8 +544,8 @@ public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<S
541544

542545
Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
543546
consumerGroups,
544-
25,
545-
4,
547+
properties.getListConsumerGroupOffsetsPartitionSize(),
548+
properties.getListConsumerGroupOffsetsConcurrency(),
546549
call,
547550
mapMerger()
548551
);

0 commit comments

Comments
 (0)