Skip to content

Commit 32a09fa

Browse files
yazebochanRoman Nedzvetskiy
andauthored
Added bytesIn/OutPerSec params for clusterMetrics object (#56)
* Added bytesIn/OutPerSec params for clusterMetrics object * Removed redundant todos, cleaned imports * Jmx connections moved to pool, methods moved to separate classes * Added pool handling and returning methods * Fix after previous PR comments - fixed result map, configured pool, removed redundant methods and code * Removed redundant imports and empty initialization * Removed fill method * Closing connection replaced to destroyObject method * Try catch block while returning object to pool was fixed * Removed redundant logs and try catch Co-authored-by: Roman Nedzvetskiy <[email protected]>
1 parent f974feb commit 32a09fa

File tree

15 files changed

+195
-36
lines changed

15 files changed

+195
-36
lines changed

kafka-ui-api/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@
100100
<artifactId>reactor-test</artifactId>
101101
<scope>test</scope>
102102
</dependency>
103+
<dependency>
104+
<groupId>org.apache.commons</groupId>
105+
<artifactId>commons-pool2</artifactId>
106+
<version>${apache.commons.version}</version>
107+
</dependency>
108+
103109
</dependencies>
104110

105111
<build>

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/ClustersProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ public static class Cluster {
2121
String zookeeper;
2222
String schemaRegistry;
2323
String schemaNameTemplate = "%s-value";
24+
int jmxPort;
2425
}
2526
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.provectus.kafka.ui.cluster.config;
2+
3+
import com.provectus.kafka.ui.cluster.util.JmxPoolFactory;
4+
import org.apache.commons.pool2.KeyedObjectPool;
5+
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
6+
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.jmx.export.MBeanExporter;
10+
11+
import javax.management.remote.JMXConnector;
12+
13+
@Configuration
14+
public class Config {
15+
16+
@Bean
17+
public KeyedObjectPool<String, JMXConnector> pool() {
18+
GenericKeyedObjectPool<String, JMXConnector> pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
19+
pool.setConfig(poolConfig());
20+
return pool;
21+
}
22+
23+
private GenericKeyedObjectPoolConfig poolConfig() {
24+
GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
25+
poolConfig.setMaxIdlePerKey(3);
26+
poolConfig.setMaxTotalPerKey(3);
27+
return poolConfig;
28+
}
29+
30+
@Bean
31+
public MBeanExporter exporter()
32+
{
33+
final MBeanExporter exporter = new MBeanExporter();
34+
exporter.setAutodetect(true);
35+
exporter.setExcludedBeans("pool");
36+
return exporter;
37+
}
38+
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalClusterMetrics.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import lombok.Builder;
44
import lombok.Data;
55

6+
import java.math.BigDecimal;
67
import java.util.Map;
78

89

@@ -18,11 +19,9 @@ public class InternalClusterMetrics {
1819
private final int offlinePartitionCount;
1920
private final int inSyncReplicasCount;
2021
private final int outOfSyncReplicasCount;
21-
//TODO: find way to fill
22-
private final int bytesInPerSec;
23-
private final int bytesOutPerSec;
22+
private final Map<String, BigDecimal> bytesInPerSec;
23+
private final Map<String, BigDecimal> bytesOutPerSec;
2424
private final int segmentCount;
25-
//TODO: find way to fill
2625
private final long segmentSize;
2726
private final Map<Integer, InternalBrokerMetrics> internalBrokerMetrics;
2827
private final int zooKeeperStatus;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111
public class KafkaCluster {
1212

1313
private final String name;
14-
private final String jmxHost;
15-
private final String jmxPort;
14+
private final int jmxPort;
1615
private final String bootstrapServers;
1716
private final String zookeeper;
1817
private final String schemaRegistry;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import com.provectus.kafka.ui.model.*;
1010
import lombok.RequiredArgsConstructor;
1111
import lombok.SneakyThrows;
12-
13-
import org.apache.kafka.clients.admin.ConsumerGroupListing;
1412
import org.apache.kafka.clients.consumer.ConsumerConfig;
1513
import org.apache.kafka.clients.consumer.KafkaConsumer;
1614
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -20,7 +18,6 @@
2018
import reactor.core.publisher.Flux;
2119
import reactor.core.publisher.Mono;
2220

23-
import java.time.OffsetDateTime;
2421
import java.util.*;
2522
import java.util.stream.Collectors;
2623
import java.util.stream.Stream;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ConsumingService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) {
112112
.collect(Collectors.toMap(
113113
partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()),
114114
Map.Entry::getValue
115-
));
115+
));
116116
consumer.offsetsForTimes(timestampsToSearch)
117117
.forEach((topicPartition, offsetAndTimestamp) ->
118118
consumer.seek(topicPartition, offsetAndTimestamp.offset())

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/MetricsUpdateService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
44
import com.provectus.kafka.ui.kafka.KafkaService;
5-
import com.provectus.kafka.ui.zookeeper.ZookeeperService;
65
import lombok.RequiredArgsConstructor;
76
import lombok.extern.log4j.Log4j2;
87
import org.springframework.stereotype.Service;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
import com.provectus.kafka.ui.cluster.deserialization.RecordDeserializer;
55
import com.provectus.kafka.ui.model.*;
66
import lombok.extern.slf4j.Slf4j;
7-
import com.provectus.kafka.ui.model.TopicMessage;
8-
97
import org.apache.kafka.clients.admin.*;
108
import org.apache.kafka.clients.consumer.ConsumerRecord;
119
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -15,13 +13,11 @@
1513
import org.apache.kafka.common.config.ConfigResource;
1614
import org.apache.kafka.common.record.TimestampType;
1715
import org.apache.kafka.common.utils.Bytes;
18-
1916
import reactor.core.publisher.Mono;
2017

2118
import java.time.Instant;
2219
import java.time.OffsetDateTime;
2320
import java.time.ZoneId;
24-
import java.util.HashMap;
2521
import java.util.*;
2622
import java.util.stream.Collectors;
2723
import java.util.stream.Stream;
@@ -32,23 +28,27 @@
3228
@Slf4j
3329
public class ClusterUtil {
3430

31+
32+
33+
34+
3535
private static final String CLUSTER_VERSION_PARAM_KEY = "inter.broker.protocol.version";
3636

3737
private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
3838

39-
public static <T> Mono<T> toMono(KafkaFuture<T> future){
40-
return Mono.create(sink -> future.whenComplete((res, ex)->{
41-
if (ex!=null) {
39+
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
40+
return Mono.create(sink -> future.whenComplete((res, ex) -> {
41+
if (ex != null) {
4242
sink.error(ex);
4343
} else {
4444
sink.success(res);
4545
}
4646
}));
4747
}
4848

49-
public static Mono<String> toMono(KafkaFuture<Void> future, String topicName){
50-
return Mono.create(sink -> future.whenComplete((res, ex)->{
51-
if (ex!=null) {
49+
public static Mono<String> toMono(KafkaFuture<Void> future, String topicName) {
50+
return Mono.create(sink -> future.whenComplete((res, ex) -> {
51+
if (ex != null) {
5252
sink.error(ex);
5353
} else {
5454
sink.success(topicName);
@@ -111,7 +111,7 @@ public static InternalTopic mapToInternalTopic(TopicDescription topicDescription
111111
partitionDto.inSyncReplicasCount(partition.isr().size());
112112
partitionDto.replicasCount(partition.replicas().size());
113113
List<InternalReplica> replicas = partition.replicas().stream().map(
114-
r -> new InternalReplica(r.id(), partition.leader().id()!=r.id(), partition.isr().contains(r)))
114+
r -> new InternalReplica(r.id(), partition.leader().id() != r.id(), partition.isr().contains(r)))
115115
.collect(Collectors.toList());
116116
partitionDto.replicas(replicas);
117117
return partitionDto.build();
@@ -185,6 +185,7 @@ private static TopicMessage.TimestampTypeEnum mapToTimestampType(TimestampType t
185185
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
186186
}
187187
}
188+
188189
public static Mono<Set<ExtendedAdminClient.SupportedFeature>> getSupportedFeatures(AdminClient adminClient) {
189190
return ClusterUtil.toMono(adminClient.describeCluster().controller())
190191
.map(Node::id)
@@ -210,7 +211,7 @@ private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Ma
210211
}
211212
}
212213

213-
public static Topic convertToTopic (InternalTopic internalTopic) {
214+
public static Topic convertToTopic(InternalTopic internalTopic) {
214215
Topic topic = new Topic();
215216
topic.setName(internalTopic.getName());
216217
List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.provectus.kafka.ui.cluster.util;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.apache.commons.pool2.KeyedObjectPool;
6+
import org.springframework.stereotype.Component;
7+
8+
import javax.management.*;
9+
import javax.management.remote.JMXConnector;
10+
import java.io.IOException;
11+
import java.math.BigDecimal;
12+
import java.net.MalformedURLException;
13+
import java.util.Arrays;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
@Component
19+
@Slf4j
20+
@RequiredArgsConstructor
21+
public class JmxClusterUtil {
22+
23+
private final KeyedObjectPool<String, JMXConnector> pool;
24+
25+
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://";
26+
private static final String JMX_SERVICE_TYPE = "jmxrmi";
27+
28+
public static final String BYTES_IN_PER_SEC = "BytesInPerSec";
29+
public static final String BYTES_OUT_PER_SEC = "BytesOutPerSec";
30+
private static final String BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_IN_PER_SEC;
31+
private static final String BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME = "kafka.server:type=BrokerTopicMetrics,name=" + BYTES_OUT_PER_SEC;
32+
33+
private static final List<String> attrNames = Arrays.asList("OneMinuteRate", "FiveMinuteRate", "FifteenMinuteRate");
34+
35+
public Map<String, BigDecimal> getJmxTrafficMetrics(int jmxPort, String jmxHost, String metricName) {
36+
String jmxUrl = JMX_URL + jmxHost + ":" + jmxPort + "/" + JMX_SERVICE_TYPE;
37+
Map<String, BigDecimal> result = new HashMap<>();
38+
JMXConnector srv = null;
39+
try {
40+
srv = pool.borrowObject(jmxUrl);
41+
MBeanServerConnection msc = srv.getMBeanServerConnection();
42+
ObjectName name = metricName.equals(BYTES_IN_PER_SEC) ? new ObjectName(BYTES_IN_PER_SEC_MBEAN_OBJECT_NAME) :
43+
new ObjectName(BYTES_OUT_PER_SEC_MBEAN_OBJECT_NAME);
44+
for (String attrName : attrNames) {
45+
result.put(attrName, BigDecimal.valueOf((Double) msc.getAttribute(name, attrName)));
46+
}
47+
pool.returnObject(jmxUrl, srv);
48+
} catch (MalformedURLException url) {
49+
log.error("Cannot create JmxServiceUrl from {}", jmxUrl);
50+
closeConnectionExceptionally(jmxUrl, srv);
51+
} catch (IOException io) {
52+
log.error("Cannot connect to KafkaJmxServer with url {}", jmxUrl);
53+
closeConnectionExceptionally(jmxUrl, srv);
54+
} catch (MBeanException | AttributeNotFoundException | InstanceNotFoundException | ReflectionException e) {
55+
log.error("Cannot find attribute", e);
56+
closeConnectionExceptionally(jmxUrl, srv);
57+
} catch (MalformedObjectNameException objectNameE) {
58+
log.error("Cannot create objectName", objectNameE);
59+
closeConnectionExceptionally(jmxUrl, srv);
60+
} catch (Exception e) {
61+
log.error("Error while retrieving connection {} from pool", jmxUrl);
62+
closeConnectionExceptionally(jmxUrl, srv);
63+
}
64+
return result;
65+
}
66+
67+
private void closeConnectionExceptionally(String url, JMXConnector srv) {
68+
try {
69+
pool.invalidateObject(url, srv);
70+
} catch (Exception e) {
71+
log.error("Cannot invalidate object in pool, {}", url);
72+
}
73+
}
74+
}

0 commit comments

Comments
 (0)