Skip to content

Commit efc35a9

Browse files
yazebochanRoman Nedzvetskiy
andauthored
Backend jmx metrics (#64)
* Start doing endpoint for jmx metrics * Added endpoint for getting jmx metric per broker * Cluster jmx metrics sum endpoit added * Added endpoints for cluster metrics and broker metrics * Cleared some code * Fixed jmxmetrics names * Changed to all values in metrics * Removed redundant imports * Renamed param constant * Changed to calculate brokers and clusters metrics in one place * Removed redundant imports * Fixed some mistakes * Replaced multiple method usage into single * Fixed mulptiple call * Removed cluster level metrics, now only broker-level metrics in cluster * Just small fixes * removed redundant variable * Renamed method for cluster level metrics * Fixed after PR and added sum for number cluster metrics by num and persec keywords in canonicalname * Added metricdto object * Added list of metrics to enum * Renames and optimizings * Renamed jmxmetrics objects param to metrics Co-authored-by: Roman Nedzvetskiy <[email protected]>
1 parent 66afaa4 commit efc35a9

File tree

16 files changed

+260
-118
lines changed

16 files changed

+260
-118
lines changed

docker/kafka-clusters-only.yaml

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,35 @@ services:
2323
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:9092,PLAINTEXT_HOST://localhost:29091 #,PLAIN://kafka0:29090
2424
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT #,PLAIN:PLAINTEXT
2525
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
26-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
26+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
27+
JMX_PORT: 9997
28+
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
29+
30+
kafka01:
31+
image: confluentinc/cp-kafka:5.1.0
32+
depends_on:
33+
- zookeeper0
34+
ports:
35+
- 29093:29093
36+
- 9999:9999
37+
environment:
38+
KAFKA_BROKER_ID: 2
39+
KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
40+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
41+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
42+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
43+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
2744
JMX_PORT: 9997
2845
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
29-
#
30-
# kafka01:
31-
# image: confluentinc/cp-kafka:5.1.0
32-
# depends_on:
33-
# - zookeeper0
34-
# ports:
35-
# - 29093:29093
36-
# - 9999:9999
37-
# environment:
38-
# KAFKA_BROKER_ID: 2
39-
# KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2183
40-
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka01:9092,PLAINTEXT_HOST://localhost:29093,PLAIN://kafka0:29090
41-
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,PLAIN:PLAINTEXT
42-
# KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
43-
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
44-
# JMX_PORT: 9997
45-
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9997
4646

4747
kafka-init-topics0:
4848
image: confluentinc/cp-kafka:5.1.0
4949
depends_on:
5050
- kafka0
51-
- kafka1
52-
command:
53-
"kafka-console-producer --broker-list kafka1:9092 --topic secondUsers && \
54-
This is message 1 && \
55-
This is message 2 && \
56-
This is message 3 && \
57-
Message 4 && \
58-
Message 5"
51+
command: "bash -c 'echo Waiting for Kafka to be ready... && \
52+
cub kafka-ready -b kafka0:9092 1 20 && \
53+
kafka-topics --create --topic users --partitions 2 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183 && \
54+
kafka-topics --create --topic messages --partitions 3 --replication-factor 2 --if-not-exists --zookeeper zookeeper0:2183'"
5955
environment:
6056
KAFKA_BROKER_ID: ignored
6157
KAFKA_ZOOKEEPER_CONNECT: ignored
@@ -101,21 +97,21 @@ services:
10197
networks:
10298
- default
10399

104-
# schemaregistry0:
105-
# image: confluentinc/cp-schema-registry:5.1.0
106-
# depends_on:
107-
# - zookeeper0
108-
# - kafka0
109-
# - kafka01
110-
# ports:
111-
# - 8085:8085
112-
# environment:
113-
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
114-
# SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
115-
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
116-
# SCHEMA_REGISTRY_HOST_NAME: schemaregistry
117-
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
118-
#
119-
# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
120-
# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
121-
# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
100+
schemaregistry0:
101+
image: confluentinc/cp-schema-registry:5.1.0
102+
depends_on:
103+
- zookeeper0
104+
- kafka0
105+
- kafka01
106+
ports:
107+
- 8085:8085
108+
environment:
109+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:9092,PLAINTEXT://kafka01:9092
110+
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2183
111+
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
112+
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
113+
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8085
114+
115+
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
116+
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
117+
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.provectus.kafka.ui.cluster.config;
22

3+
import com.provectus.kafka.ui.cluster.util.JmxMetricsNames;
34
import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
45
import org.apache.commons.pool2.KeyedObjectPool;
56
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
@@ -9,6 +10,9 @@
910
import org.springframework.jmx.export.MBeanExporter;
1011

1112
import javax.management.remote.JMXConnector;
13+
import java.util.List;
14+
import java.util.stream.Collectors;
15+
import java.util.stream.Stream;
1216

1317
@Configuration
1418
public class Config {
@@ -35,4 +39,9 @@ public MBeanExporter exporter()
3539
exporter.setExcludedBeans("pool");
3640
return exporter;
3741
}
42+
43+
@Bean
44+
public List<String> jmxMetricsNames() {
45+
return Stream.of(JmxMetricsNames.values()).map(Enum::name).collect(Collectors.toList());
46+
}
3847
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,16 @@
99
import org.mapstruct.Mapper;
1010
import org.mapstruct.Mapping;
1111

12-
import java.math.BigDecimal;
13-
1412
@Mapper(componentModel = "spring")
1513
public interface ClusterMapper {
1614

17-
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
18-
1915
@Mapping(target = "brokerCount", source = "metrics.brokerCount")
2016
@Mapping(target = "onlinePartitionCount", source = "metrics.onlinePartitionCount")
2117
@Mapping(target = "topicCount", source = "metrics.topicCount")
22-
@Mapping(target = "bytesInPerSec", source = "metrics.bytesInPerSec")
23-
@Mapping(target = "bytesOutPerSec", source = "metrics.bytesOutPerSec")
18+
@Mapping(target = "metrics", source = "metrics.metrics")
2419
Cluster toCluster(KafkaCluster cluster);
2520

26-
default BigDecimal map (Number number) {
27-
return new BigDecimal(number.toString());
28-
}
29-
21+
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
3022
BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
3123
Topic toTopic(InternalTopic topic);
3224
TopicDetails toTopicDetails(InternalTopic topic);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package com.provectus.kafka.ui.cluster.model;
22

3+
import com.provectus.kafka.ui.model.Metric;
34
import lombok.Builder;
45
import lombok.Data;
56

7+
import java.util.List;
8+
69
@Data
710
@Builder(toBuilder = true)
811
public class InternalBrokerMetrics {
912
private final Long segmentSize;
13+
private final List<Metric> jmxMetrics;
1014
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package com.provectus.kafka.ui.cluster.model;
22

3+
import com.provectus.kafka.ui.model.Metric;
34
import lombok.Builder;
45
import lombok.Data;
56

6-
import java.math.BigDecimal;
7+
import java.util.List;
78
import java.util.Map;
89

910

@@ -24,5 +25,6 @@ public class InternalClusterMetrics {
2425
private final int segmentCount;
2526
private final long segmentSize;
2627
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
28+
private final List<Metric> metrics;
2729
private final int zooKeeperStatus;
2830
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public class InternalTopic {
2222
private final int inSyncReplicas;
2323
private final int replicationFactor;
2424
private final int underReplicatedPartitions;
25-
//TODO: find way to fill
2625
private final long segmentSize;
2726
private final int segmentCount;
2827
private final Map<TopicPartition, Long> partitionSegmentSize;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.provectus.kafka.ui.cluster.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
6+
import java.math.BigDecimal;
7+
8+
@Getter
9+
@AllArgsConstructor
10+
public class MetricDto {
11+
private String canonicalName;
12+
private String metricName;
13+
private BigDecimal value;
14+
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,20 @@ public List<Cluster> getClusters() {
3838
.collect(Collectors.toList());
3939
}
4040

41-
public Optional<BrokersMetrics> getBrokersMetrics(String name) {
42-
return clustersStorage.getClusterByName(name)
41+
public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
42+
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
4343
.map(KafkaCluster::getMetrics)
44-
.map(clusterMapper::toBrokerMetrics);
44+
.map(s -> {
45+
var brokerMetrics = clusterMapper.toBrokerMetrics(s);
46+
brokerMetrics.setMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics());
47+
brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue());
48+
return brokerMetrics;
49+
}));
4550
}
4651

4752
public List<Topic> getTopics(String name) {
4853
return clustersStorage.getClusterByName(name)
49-
.map( c ->
54+
.map(c ->
5055
c.getTopics().values().stream()
5156
.map(clusterMapper::toTopic)
5257
.collect(Collectors.toList())
@@ -127,6 +132,7 @@ public Flux<Broker> getBrokers (String clusterName) {
127132
.map(n -> n.stream().map(node -> {
128133
Broker broker = new Broker();
129134
broker.setId(node.idString());
135+
broker.setHost(node.host());
130136
return broker;
131137
}).collect(Collectors.toList())))
132138
.flatMapMany(Flux::fromIterable);
@@ -154,6 +160,5 @@ public Flux<TopicMessage> getMessages(String clusterName, String topicName, Cons
154160
return clustersStorage.getClusterByName(clusterName)
155161
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
156162
.orElse(Flux.empty());
157-
158163
}
159164
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.provectus.kafka.ui.cluster.util;
22

3-
import com.provectus.kafka.ui.cluster.model.*;
43
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
4+
import com.provectus.kafka.ui.cluster.model.*;
55
import com.provectus.kafka.ui.model.*;
66
import lombok.extern.slf4j.Slf4j;
77
import org.apache.kafka.clients.admin.*;
@@ -28,10 +28,6 @@
2828
@Slf4j
2929
public class ClusterUtil {
3030

31-
32-
33-
34-
3531
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
3632

3733
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
@@ -56,7 +52,7 @@ public static Mono<String> toMono(KafkaFuture<Void> future, String topicName) {
5652
}));
5753
}
5854

59-
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c, KafkaCluster cluster) {
55+
public static ConsumerGroup convertToConsumerGroup(ConsumerGroupDescription c) {
6056
ConsumerGroup consumerGroup = new ConsumerGroup();
6157
consumerGroup.setConsumerGroupId(c.groupId());
6258
consumerGroup.setNumConsumers(c.members().size());

0 commit comments

Comments
 (0)