Skip to content

Commit f9906b5

Browse files
iliaxMgrdichiliaxhabrahamyanpro
authored
Brokers API improvements (#2743)
* Brokers API improvements: 1. broker io rates stats added 2. active controller property set to node id 3. minor refactoring 4. FE: Add an indicator for an active broker controller Co-authored-by: Mgrdich <[email protected]> Co-authored-by: iliax <[email protected]> Co-authored-by: Hrant Abrahamyan <[email protected]> Co-authored-by: Mgrdich <[email protected]>
1 parent 7837622 commit f9906b5

File tree

17 files changed

+256
-99
lines changed

17 files changed

+256
-99
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterNa
3737
@Override
3838
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
3939
ServerWebExchange exchange) {
40-
return Mono.just(ResponseEntity.ok(brokerService.getBrokers(getCluster(clusterName))));
40+
return Mono.just(ResponseEntity.ok(
41+
brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto)));
4142
}
4243

4344
@Override

kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.provectus.kafka.ui.config.ClustersProperties;
44
import com.provectus.kafka.ui.model.BrokerConfigDTO;
5+
import com.provectus.kafka.ui.model.BrokerDTO;
56
import com.provectus.kafka.ui.model.BrokerDiskUsageDTO;
67
import com.provectus.kafka.ui.model.BrokerMetricsDTO;
78
import com.provectus.kafka.ui.model.ClusterDTO;
@@ -14,6 +15,7 @@
1415
import com.provectus.kafka.ui.model.ConnectDTO;
1516
import com.provectus.kafka.ui.model.FailoverUrlList;
1617
import com.provectus.kafka.ui.model.Feature;
18+
import com.provectus.kafka.ui.model.InternalBroker;
1719
import com.provectus.kafka.ui.model.InternalBrokerConfig;
1820
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
1921
import com.provectus.kafka.ui.model.InternalClusterState;
@@ -103,6 +105,8 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {
103105

104106
PartitionDTO toPartition(InternalPartition topic);
105107

108+
BrokerDTO toBrokerDto(InternalBroker broker);
109+
106110
@Named("setSchemaRegistry")
107111
default InternalSchemaRegistry setSchemaRegistry(ClustersProperties.Cluster clusterProperties) {
108112
if (clusterProperties == null
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.provectus.kafka.ui.model;
2+
3+
import java.math.BigDecimal;
4+
import lombok.Data;
5+
import org.apache.kafka.common.Node;
6+
7+
@Data
8+
public class InternalBroker {
9+
10+
private final Integer id;
11+
private final String host;
12+
private final Integer port;
13+
private final BigDecimal bytesInPerSec;
14+
private final BigDecimal bytesOutPerSec;
15+
16+
public InternalBroker(Node node, Statistics statistics) {
17+
this.id = node.id();
18+
this.host = node.host();
19+
this.port = node.port();
20+
this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
21+
this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
22+
}
23+
24+
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalClusterState.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.Optional;
77
import java.util.stream.Collectors;
88
import lombok.Data;
9+
import org.apache.kafka.common.Node;
910

1011
@Data
1112
public class InternalClusterState {
@@ -37,7 +38,9 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
3738
.orElse(null);
3839
topicCount = statistics.getTopicDescriptions().size();
3940
brokerCount = statistics.getClusterDescription().getNodes().size();
40-
activeControllers = statistics.getClusterDescription().getController() != null ? 1 : 0;
41+
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
42+
.map(Node::id)
43+
.orElse(null);
4144
version = statistics.getVersion();
4245

4346
if (statistics.getLogDirInfo() != null) {
@@ -53,15 +56,17 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
5356

5457
bytesInPerSec = statistics
5558
.getMetrics()
56-
.getBytesInPerSec()
59+
.getBrokerBytesInPerSec()
5760
.values().stream()
58-
.reduce(BigDecimal.ZERO, BigDecimal::add);
61+
.reduce(BigDecimal::add)
62+
.orElse(null);
5963

6064
bytesOutPerSec = statistics
6165
.getMetrics()
62-
.getBytesOutPerSec()
66+
.getBrokerBytesOutPerSec()
6367
.values().stream()
64-
.reduce(BigDecimal.ZERO, BigDecimal::add);
68+
.reduce(BigDecimal::add)
69+
.orElse(null);
6570

6671
var partitionsStats = new PartitionsStats(statistics.getTopicDescriptions().values());
6772
onlinePartitionCount = partitionsStats.getOnlinePartitionCount();

kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ public static InternalTopic from(TopicDescription topicDescription,
102102
topic.segmentSize(segmentStats.getSegmentSize());
103103
}
104104

105-
topic.bytesInPerSec(metrics.getBytesInPerSec().get(topicDescription.name()));
106-
topic.bytesOutPerSec(metrics.getBytesOutPerSec().get(topicDescription.name()));
105+
topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
106+
topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));
107107

108108
topic.topicConfigs(
109109
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/Metrics.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@
1515
@Builder
1616
@Value
1717
public class Metrics {
18-
Map<String, BigDecimal> bytesInPerSec;
19-
Map<String, BigDecimal> bytesOutPerSec;
18+
19+
Map<Integer, BigDecimal> brokerBytesInPerSec;
20+
Map<Integer, BigDecimal> brokerBytesOutPerSec;
21+
Map<String, BigDecimal> topicBytesInPerSec;
22+
Map<String, BigDecimal> topicBytesOutPerSec;
2023
Map<Integer, List<RawMetric>> perBrokerMetrics;
2124

2225
public static Metrics empty() {
2326
return Metrics.builder()
24-
.bytesInPerSec(Map.of())
25-
.bytesOutPerSec(Map.of())
27+
.brokerBytesInPerSec(Map.of())
28+
.brokerBytesOutPerSec(Map.of())
29+
.topicBytesInPerSec(Map.of())
30+
.topicBytesOutPerSec(Map.of())
2631
.perBrokerMetrics(Map.of())
2732
.build();
2833
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import com.provectus.kafka.ui.exception.NotFoundException;
66
import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
77
import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
8-
import com.provectus.kafka.ui.model.BrokerDTO;
98
import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
109
import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
10+
import com.provectus.kafka.ui.model.InternalBroker;
1111
import com.provectus.kafka.ui.model.InternalBrokerConfig;
1212
import com.provectus.kafka.ui.model.KafkaCluster;
1313
import com.provectus.kafka.ui.service.metrics.RawMetric;
@@ -63,18 +63,13 @@ private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Intege
6363
.flatMapMany(Flux::fromIterable);
6464
}
6565

66-
public Flux<BrokerDTO> getBrokers(KafkaCluster cluster) {
66+
public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
6767
return adminClientService
6868
.get(cluster)
6969
.flatMap(ReactiveAdminClient::describeCluster)
7070
.map(description -> description.getNodes().stream()
71-
.map(node -> {
72-
BrokerDTO broker = new BrokerDTO();
73-
broker.setId(node.id());
74-
broker.setHost(node.host());
75-
broker.setPort(node.port());
76-
return broker;
77-
}).collect(Collectors.toList()))
71+
.map(node -> new InternalBroker(node, statisticsCache.get(cluster)))
72+
.collect(Collectors.toList()))
7873
.flatMapMany(Flux::fromIterable);
7974
}
8075

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.provectus.kafka.ui.service;
22

3-
import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
43
import static java.util.stream.Collectors.toList;
54
import static java.util.stream.Collectors.toMap;
65
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
@@ -43,6 +42,7 @@
4342
import org.apache.kafka.clients.admin.ConfigEntry;
4443
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
4544
import org.apache.kafka.clients.admin.ConsumerGroupListing;
45+
import org.apache.kafka.clients.admin.DescribeClusterOptions;
4646
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
4747
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
4848
import org.apache.kafka.clients.admin.ListOffsetsResult;
@@ -53,6 +53,7 @@
5353
import org.apache.kafka.clients.admin.OffsetSpec;
5454
import org.apache.kafka.clients.admin.RecordsToDelete;
5555
import org.apache.kafka.clients.admin.TopicDescription;
56+
import org.apache.kafka.clients.consumer.ConsumerConfig;
5657
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5758
import org.apache.kafka.common.KafkaException;
5859
import org.apache.kafka.common.KafkaFuture;
@@ -125,7 +126,8 @@ private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String
125126
}
126127
}
127128

128-
//TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
129+
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
130+
// (see MonoSink.success(..) javadoc for details)
129131
private static <T> Mono<T> toMono(KafkaFuture<T> future) {
130132
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
131133
if (ex != null) {
@@ -302,26 +304,19 @@ public Mono<ClusterDescription> describeCluster() {
302304
}
303305

304306
private static Mono<ClusterDescription> describeClusterImpl(AdminClient client) {
305-
var r = client.describeCluster();
306-
var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
307-
return Mono.create(sink -> all.whenComplete((res, ex) -> {
308-
if (ex != null) {
309-
sink.error(ex);
310-
} else {
311-
try {
312-
sink.success(
313-
new ClusterDescription(
314-
getUninterruptibly(r.controller()),
315-
getUninterruptibly(r.clusterId()),
316-
getUninterruptibly(r.nodes()),
317-
getUninterruptibly(r.authorizedOperations())
318-
)
319-
);
320-
} catch (ExecutionException e) {
321-
// can't be here, because all futures already completed
322-
}
323-
}
324-
}));
307+
var result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true));
308+
var allOfFuture = KafkaFuture.allOf(
309+
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
310+
return toMono(allOfFuture).then(
311+
Mono.fromCallable(() ->
312+
new ClusterDescription(
313+
result.controller().get(),
314+
result.clusterId().get(),
315+
result.nodes().get(),
316+
result.authorizedOperations().get()
317+
)
318+
)
319+
);
325320
}
326321

327322
private static Mono<String> getClusterVersion(AdminClient client) {

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/MetricsCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map<Node,
6262
WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
6363
perBrokerMetrics.forEach((node, metrics) ->
6464
metrics.forEach(metric ->
65-
wellKnownMetrics.populate(cluster, node, metric)));
65+
wellKnownMetrics.populate(node, metric)));
6666
return wellKnownMetrics;
6767
}
6868

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/metrics/WellKnownMetrics.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
44
import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;
55

6-
import com.provectus.kafka.ui.model.KafkaCluster;
76
import com.provectus.kafka.ui.model.Metrics;
87
import java.math.BigDecimal;
98
import java.util.HashMap;
@@ -12,16 +11,42 @@
1211

1312
class WellKnownMetrics {
1413

14+
// per broker
15+
final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
16+
final Map<Integer, BigDecimal> brokerBytesOutFifteenMinuteRate = new HashMap<>();
17+
18+
// per topic
1519
final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
1620
final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();
1721

18-
void populate(KafkaCluster cluster, Node node, RawMetric rawMetric) {
22+
void populate(Node node, RawMetric rawMetric) {
23+
updateBrokerIOrates(node, rawMetric);
1924
updateTopicsIOrates(rawMetric);
2025
}
2126

2227
void apply(Metrics.MetricsBuilder metricsBuilder) {
23-
metricsBuilder.bytesInPerSec(bytesInFifteenMinuteRate);
24-
metricsBuilder.bytesOutPerSec(bytesOutFifteenMinuteRate);
28+
metricsBuilder.topicBytesInPerSec(bytesInFifteenMinuteRate);
29+
metricsBuilder.topicBytesOutPerSec(bytesOutFifteenMinuteRate);
30+
metricsBuilder.brokerBytesInPerSec(brokerBytesInFifteenMinuteRate);
31+
metricsBuilder.brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate);
32+
}
33+
34+
private void updateBrokerIOrates(Node node, RawMetric rawMetric) {
35+
String name = rawMetric.name();
36+
if (!brokerBytesInFifteenMinuteRate.containsKey(node.id())
37+
&& rawMetric.labels().size() == 1
38+
&& "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
39+
&& containsIgnoreCase(name, "BrokerTopicMetrics")
40+
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
41+
brokerBytesInFifteenMinuteRate.put(node.id(), rawMetric.value());
42+
}
43+
if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id())
44+
&& rawMetric.labels().size() == 1
45+
&& "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
46+
&& containsIgnoreCase(name, "BrokerTopicMetrics")
47+
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
48+
brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value());
49+
}
2550
}
2651

2752
private void updateTopicsIOrates(RawMetric rawMetric) {

0 commit comments

Comments
 (0)