Skip to content

Commit 9f4bf6f

Browse files
committed
Use cache to speedup consumers query
1 parent 7f31c68 commit 9f4bf6f

File tree

2 files changed

+282
-9
lines changed

2 files changed

+282
-9
lines changed

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
import io.kafbat.ui.model.InternalConsumerGroup;
99
import io.kafbat.ui.model.InternalTopicConsumerGroup;
1010
import io.kafbat.ui.model.KafkaCluster;
11+
import io.kafbat.ui.model.ServerStatusDTO;
1112
import io.kafbat.ui.model.SortOrderDTO;
13+
import io.kafbat.ui.model.Statistics;
1214
import io.kafbat.ui.service.index.ConsumerGroupFilter;
15+
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
1316
import io.kafbat.ui.service.rbac.AccessControlService;
1417
import io.kafbat.ui.util.ApplicationMetrics;
1518
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
@@ -19,9 +22,9 @@
1922
import java.util.HashMap;
2023
import java.util.List;
2124
import java.util.Map;
25+
import java.util.Optional;
2226
import java.util.Properties;
2327
import java.util.Set;
24-
import java.util.function.Predicate;
2528
import java.util.function.ToIntFunction;
2629
import java.util.stream.Collectors;
2730
import java.util.stream.Stream;
@@ -43,6 +46,7 @@ public class ConsumerGroupService {
4346
private final AdminClientService adminClientService;
4447
private final AccessControlService accessControlService;
4548
private final ClustersProperties clustersProperties;
49+
private final StatisticsCache statisticsCache;
4650

4751
private Mono<List<InternalConsumerGroup>> getConsumerGroups(
4852
ReactiveAdminClient ac,
@@ -71,19 +75,19 @@ public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaClu
7175
return adminClientService.get(cluster)
7276
.flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false)
7377
.flatMap(endOffsets ->
74-
describeConsumerGroups(ac).flatMap(groups ->
75-
filterConsumerGroups(ac, groups, topic, endOffsets)
78+
describeConsumerGroups(cluster, ac, true).flatMap(groups ->
79+
filterConsumerGroups(cluster, ac, groups, topic, endOffsets)
7680
)
7781
)
7882
);
7983
}
8084

8185
private Mono<List<InternalTopicConsumerGroup>> filterConsumerGroups(
86+
KafkaCluster cluster,
8287
ReactiveAdminClient ac,
8388
List<ConsumerGroupDescription> groups,
8489
String topic,
8590
Map<TopicPartition, Long> endOffsets) {
86-
List<TopicPartition> partitions = new ArrayList<>(endOffsets.keySet());
8791

8892
Set<ConsumerGroupState> inactiveStates = Set.of(
8993
ConsumerGroupState.DEAD,
@@ -98,9 +102,26 @@ private Mono<List<InternalTopicConsumerGroup>> filterConsumerGroups(
98102
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, false))
99103
.toList();
100104

101-
List<ConsumerGroupDescription> filtered = new ArrayList<>();
105+
List<ConsumerGroupDescription> dead = partitioned.get(false);
106+
if (!dead.isEmpty()) {
107+
Statistics statistics = statisticsCache.get(cluster);
108+
if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) {
109+
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupsStates =
110+
statistics.getClusterState().getConsumerGroupsStates();
111+
dead = dead.stream().filter(g ->
112+
Optional.ofNullable(consumerGroupsStates.get(g.groupId()))
113+
.map(s ->
114+
s.committedOffsets().keySet().stream().anyMatch(tp -> tp.topic().equals(topic))
115+
).orElse(false)
116+
).toList();
117+
}
118+
}
119+
120+
List<ConsumerGroupDescription> filtered = new ArrayList<>(stable.size() + dead.size());
102121
filtered.addAll(stable);
103-
filtered.addAll(partitioned.get(false));
122+
filtered.addAll(dead);
123+
124+
List<TopicPartition> partitions = new ArrayList<>(endOffsets.keySet());
104125

105126
List<String> groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList();
106127
return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets ->
@@ -229,12 +250,52 @@ private <T> Stream<T> sortAndPaginate(Collection<T> collection,
229250
.limit(perPage);
230251
}
231252

232-
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdminClient ac) {
253+
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(
254+
KafkaCluster cluster,
255+
ReactiveAdminClient ac,
256+
boolean cache) {
233257
return ac.listConsumerGroupNames()
234-
.flatMap(ac::describeConsumerGroups)
235-
.map(cgs -> new ArrayList<>(cgs.values()));
258+
.flatMap(names -> describeConsumerGroups(names, cluster, ac, cache));
236259
}
237260

261+
private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(
262+
List<String> groupNames,
263+
KafkaCluster cluster,
264+
ReactiveAdminClient ac,
265+
boolean cache) {
266+
267+
Statistics statistics = statisticsCache.get(cluster);
268+
269+
if (cache && statistics.getStatus().equals(ServerStatusDTO.ONLINE)) {
270+
List<ConsumerGroupDescription> result = new ArrayList<>();
271+
List<String> notFound = new ArrayList<>();
272+
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupsStates =
273+
statistics.getClusterState().getConsumerGroupsStates();
274+
for (String groupName : groupNames) {
275+
ScrapedClusterState.ConsumerGroupState consumerGroupState = consumerGroupsStates.get(groupName);
276+
if (consumerGroupState != null) {
277+
result.add(consumerGroupState.description());
278+
} else {
279+
notFound.add(groupName);
280+
}
281+
}
282+
if (!notFound.isEmpty()) {
283+
return ac.describeConsumerGroups(notFound)
284+
.map(descriptions -> {
285+
result.addAll(descriptions.values());
286+
return result;
287+
});
288+
} else {
289+
return Mono.just(result);
290+
}
291+
} else {
292+
return ac.describeConsumerGroups(groupNames)
293+
.map(descriptions -> List.copyOf(descriptions.values()));
294+
}
295+
}
296+
297+
298+
238299

239300
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(
240301
ReactiveAdminClient ac,
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
package io.kafbat.ui.service;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.google.common.collect.ImmutableTable;
6+
import io.kafbat.ui.config.ClustersProperties;
7+
import io.kafbat.ui.model.InternalTopicConsumerGroup;
8+
import io.kafbat.ui.model.KafkaCluster;
9+
import io.kafbat.ui.model.Metrics;
10+
import io.kafbat.ui.model.ServerStatusDTO;
11+
import io.kafbat.ui.model.Statistics;
12+
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
13+
import io.kafbat.ui.service.rbac.AccessControlService;
14+
import java.time.Instant;
15+
import java.util.HashMap;
16+
import java.util.HashSet;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.UUID;
20+
import java.util.stream.Collectors;
21+
import java.util.stream.Stream;
22+
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
23+
import org.apache.kafka.clients.admin.ConsumerGroupListing;
24+
import org.apache.kafka.clients.admin.MemberAssignment;
25+
import org.apache.kafka.clients.admin.MemberDescription;
26+
import org.apache.kafka.common.ConsumerGroupState;
27+
import org.apache.kafka.common.TopicPartition;
28+
import org.junit.jupiter.api.Test;
29+
import org.mockito.Mockito;
30+
import reactor.core.publisher.Mono;
31+
32+
class ConsumerGroupServiceTest {
33+
34+
@Test
35+
void getConsumerGroupsForTopicConsumerGroups() {
36+
// given
37+
ClustersProperties.Cluster clusterProperties = new ClustersProperties.Cluster();
38+
clusterProperties.setName("test");
39+
40+
ClustersProperties clustersProperties = new ClustersProperties();
41+
clustersProperties.getClusters().add(clusterProperties);
42+
43+
44+
KafkaCluster cluster = KafkaCluster.builder()
45+
.name("test")
46+
.originalProperties(clusterProperties)
47+
.build();
48+
49+
ReactiveAdminClient client = Mockito.mock(ReactiveAdminClient.class);
50+
AdminClientService admin = Mockito.mock(AdminClientService.class);
51+
Mockito.when(admin.get(cluster)).thenReturn(Mono.just(client));
52+
53+
final String topic = UUID.randomUUID().toString();
54+
final String anotherTopic = UUID.randomUUID().toString();
55+
56+
Map<String, ScrapedClusterState.ConsumerGroupState> consumersWithTopic =
57+
Stream.generate(() -> generate(
58+
List.of(new TopicPartition(topic, 0)),
59+
Map.of(new TopicPartition(topic, 0), 100L),
60+
ConsumerGroupState.DEAD
61+
)).limit(10).collect(Collectors.toMap(
62+
ScrapedClusterState.ConsumerGroupState::group,
63+
s -> s
64+
));
65+
66+
Map<String, ScrapedClusterState.ConsumerGroupState> consumersWithoutTopic =
67+
Stream.generate(() -> generate(
68+
List.of(new TopicPartition(anotherTopic, 0)),
69+
Map.of(new TopicPartition(anotherTopic, 0), 100L),
70+
ConsumerGroupState.DEAD
71+
)).limit(10).collect(Collectors.toMap(
72+
ScrapedClusterState.ConsumerGroupState::group,
73+
s -> s
74+
));
75+
76+
Map<String, ScrapedClusterState.ConsumerGroupState> stableConsumersWithTopic =
77+
Stream.generate(() -> generate(
78+
List.of(new TopicPartition(topic, 0)),
79+
Map.of(new TopicPartition(topic, 0), 100L),
80+
ConsumerGroupState.STABLE
81+
)).limit(10).collect(Collectors.toMap(
82+
ScrapedClusterState.ConsumerGroupState::group,
83+
s -> s
84+
));
85+
86+
Map<String, ScrapedClusterState.ConsumerGroupState> stableConsumersWithoutTopic =
87+
Stream.generate(() -> generate(
88+
List.of(new TopicPartition(anotherTopic, 0)),
89+
Map.of(new TopicPartition(anotherTopic, 0), 100L),
90+
ConsumerGroupState.STABLE
91+
)).limit(10).collect(Collectors.toMap(
92+
ScrapedClusterState.ConsumerGroupState::group,
93+
s -> s
94+
));
95+
96+
Map<String, ScrapedClusterState.ConsumerGroupState> consumerGroupStates = new HashMap<>();
97+
consumerGroupStates.putAll(consumersWithTopic);
98+
consumerGroupStates.putAll(consumersWithoutTopic);
99+
consumerGroupStates.putAll(stableConsumersWithTopic);
100+
consumerGroupStates.putAll(stableConsumersWithoutTopic);
101+
102+
Mockito.when(client.listConsumerGroups()).thenReturn(Mono.just(
103+
consumerGroupStates.keySet()
104+
.stream()
105+
.map(s -> new ConsumerGroupListing(s, false))
106+
.toList()
107+
));
108+
109+
Mockito.when(client.listConsumerGroupNames()).thenReturn(Mono.just(
110+
List.copyOf(consumerGroupStates.keySet())
111+
));
112+
113+
Mockito.when(client.listTopicOffsets(Mockito.eq(topic), Mockito.any(), Mockito.eq(false)))
114+
.thenReturn(Mono.just(Map.of(new TopicPartition(topic, 0), 100L)));
115+
116+
Mockito.when(client.describeConsumerGroups(
117+
Mockito.any())
118+
).thenReturn(
119+
Mono.just(
120+
consumerGroupStates.values().stream()
121+
.collect(Collectors.toMap(
122+
ScrapedClusterState.ConsumerGroupState::group,
123+
ScrapedClusterState.ConsumerGroupState::description
124+
))
125+
)
126+
);
127+
128+
Mockito.when(client.listConsumerGroupOffsets(Mockito.any(), Mockito.any())).thenAnswer(
129+
a -> {
130+
List<String> groupIds = (List<String>) a.getArgument(0);
131+
var table = ImmutableTable.<String, TopicPartition, Long>builder();
132+
for (String groupId : groupIds) {
133+
ScrapedClusterState.ConsumerGroupState state = consumerGroupStates.get(groupId);
134+
for (Map.Entry<TopicPartition, Long> entry : state.committedOffsets().entrySet()) {
135+
table.put(groupId, entry.getKey(), entry.getValue());
136+
}
137+
}
138+
return Mono.just(table.build());
139+
}
140+
);
141+
142+
ScrapedClusterState state = ScrapedClusterState.builder()
143+
.scrapeFinishedAt(Instant.now())
144+
.nodesStates(Map.of())
145+
.topicStates(Map.of())
146+
.consumerGroupsStates(consumerGroupStates)
147+
.build();
148+
149+
Statistics statistics = Statistics.builder()
150+
.status(ServerStatusDTO.ONLINE)
151+
.version("Unknown")
152+
.features(List.of())
153+
.clusterDescription(ReactiveAdminClient.ClusterDescription.empty())
154+
.metrics(Metrics.empty())
155+
.clusterState(state)
156+
.build();
157+
158+
StatisticsCache cache = Mockito.mock(StatisticsCache.class);
159+
Mockito.when(cache.get(cluster)).thenReturn(statistics);
160+
161+
AccessControlService acl = Mockito.mock(AccessControlService.class);
162+
ConsumerGroupService consumerGroupService =
163+
new ConsumerGroupService(admin, acl, clustersProperties, cache);
164+
165+
// should
166+
List<InternalTopicConsumerGroup> groups =
167+
consumerGroupService.getConsumerGroupsForTopic(cluster, topic).block();
168+
169+
assertThat(groups).size().isEqualTo(
170+
consumersWithTopic.size() + stableConsumersWithTopic.size()
171+
);
172+
173+
List<String> resultedGroupIds = groups.stream().map(InternalTopicConsumerGroup::getGroupId).toList();
174+
assertThat(resultedGroupIds).containsAll(consumersWithTopic.keySet());
175+
176+
assertThat(resultedGroupIds).containsAll(stableConsumersWithTopic.keySet());
177+
}
178+
179+
private ScrapedClusterState.ConsumerGroupState generate(
180+
List<TopicPartition> topicPartitions,
181+
Map<TopicPartition, Long> lastOffsets,
182+
ConsumerGroupState state
183+
) {
184+
final String name = UUID.randomUUID().toString();
185+
Map<TopicPartition, Long> commited = topicPartitions.stream()
186+
.map(tp -> Map.entry(tp, lastOffsets.get(tp) - 1))
187+
.collect(Collectors.toMap(
188+
Map.Entry::getKey,
189+
Map.Entry::getValue
190+
));
191+
192+
List<MemberDescription> members = state.equals(ConsumerGroupState.STABLE) ? List.of(
193+
new MemberDescription(
194+
UUID.randomUUID().toString(),
195+
UUID.randomUUID().toString(),
196+
"localhost",
197+
new MemberAssignment(new HashSet<>(topicPartitions))
198+
)
199+
) : List.of();
200+
201+
return new ScrapedClusterState.ConsumerGroupState(
202+
name,
203+
new ConsumerGroupDescription(
204+
name,
205+
false,
206+
members, "",
207+
state,
208+
null
209+
), commited
210+
);
211+
}
212+
}

0 commit comments

Comments
 (0)