Skip to content

Commit bad57f8

Browse files
authored
Merge pull request #15 from tinawenqiao/dev
Add health check in KafkaController.
2 parents 8f41c27 + 3bae9de commit bad57f8

File tree

7 files changed

+135
-8
lines changed

7 files changed

+135
-8
lines changed

src/main/java/org/gnuhpc/bigdata/config/KafkaConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.gnuhpc.bigdata.config;
22

33
import lombok.Data;
4+
import lombok.Getter;
45
import lombok.extern.log4j.Log4j;
56
import org.apache.kafka.clients.consumer.ConsumerConfig;
67
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -32,6 +33,7 @@
3233
@Data
3334
@EnableKafka
3435
@Configuration
36+
@Getter
3537
public class KafkaConfig {
3638
@Value("${kafka.brokers}")
3739
private String brokers;
@@ -42,10 +44,12 @@ public class KafkaConfig {
4244
@Value("${kafka.offset.partitions}")
4345
private int internalTopicPartitions;
4446

45-
4647
@Value("${spring.kafka.consumer.group-id}")
4748
private String groupId;
4849

50+
@Value("${kafka.healthcheck.topic}")
51+
private String healthCheckTopic;
52+
4953
@Bean(initMethod = "init", destroyMethod = "destroy")
5054
public KafkaUtils kafkaUtils() {
5155
return new KafkaUtils();

src/main/java/org/gnuhpc/bigdata/controller/KafkaController.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,4 +271,10 @@ private void isTopicExist(String topic) throws InvalidTopicException {
271271
throw new InvalidTopicException("Topic " + topic + " non-exist!");
272272
}
273273
}
274+
275+
@GetMapping(value = "/health")
276+
@ApiOperation(value = "Check the cluster health.")
277+
public HealthCheckResult healthCheck() {
278+
return kafkaAdminService.healthCheck();
279+
}
274280
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.gnuhpc.bigdata.model;
2+
3+
import com.fasterxml.jackson.annotation.JsonFormat;
4+
import lombok.Getter;
5+
import lombok.Setter;
6+
7+
import java.time.LocalDateTime;
8+
9+
@Getter
10+
@Setter
11+
public class HealthCheckResult {
12+
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss")
13+
private LocalDateTime timestamp;
14+
public String status;
15+
public String msg;
16+
17+
public HealthCheckResult() {
18+
this.timestamp = LocalDateTime.now();
19+
}
20+
}

src/main/java/org/gnuhpc/bigdata/service/KafkaAdminService.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,21 @@
2222
import lombok.extern.log4j.Log4j;
2323
import org.apache.commons.lang3.StringUtils;
2424
import org.apache.curator.framework.CuratorFramework;
25-
import org.apache.kafka.clients.consumer.ConsumerConfig;
2625
import org.apache.kafka.clients.consumer.ConsumerRecord;
2726
import org.apache.kafka.clients.consumer.ConsumerRecords;
2827
import org.apache.kafka.clients.consumer.KafkaConsumer;
29-
import org.apache.kafka.common.KafkaException;
28+
import org.apache.kafka.clients.producer.KafkaProducer;
29+
import org.apache.kafka.clients.producer.ProducerRecord;
30+
import org.apache.kafka.clients.producer.RecordMetadata;
3031
import org.apache.kafka.common.Node;
3132
import org.apache.kafka.common.PartitionInfo;
3233
import org.apache.kafka.common.TopicPartition;
3334
import org.apache.kafka.common.errors.ApiException;
3435
import org.apache.kafka.common.errors.InvalidTopicException;
3536
import org.apache.kafka.common.requests.MetadataResponse;
36-
import org.apache.kafka.common.serialization.StringDeserializer;
3737
import org.gnuhpc.bigdata.CollectionConvertor;
3838
import org.gnuhpc.bigdata.componet.OffsetStorage;
39-
import org.gnuhpc.bigdata.constant.ConsumerState;
39+
import org.gnuhpc.bigdata.config.KafkaConfig;
4040
import org.gnuhpc.bigdata.constant.ConsumerType;
4141
import org.gnuhpc.bigdata.constant.GeneralResponseState;
4242
import org.gnuhpc.bigdata.model.*;
@@ -84,6 +84,9 @@ public class KafkaAdminService {
8484
@Autowired
8585
private KafkaUtils kafkaUtils;
8686

87+
@Autowired
88+
private KafkaConfig kafkaConfig;
89+
8790
@Autowired
8891
private OffsetStorage storage;
8992

@@ -1046,4 +1049,64 @@ private boolean isConsumerGroupActive(String consumerGroup, ConsumerType type) {
10461049
throw new ApiException("Unknown type " + type);
10471050
}
10481051
}
1052+
1053+
public HealthCheckResult healthCheck() {
1054+
String healthCheckTopic = kafkaConfig.getHealthCheckTopic();
1055+
HealthCheckResult healthCheckResult = new HealthCheckResult();
1056+
KafkaProducer producer = kafkaUtils.createProducer();
1057+
KafkaConsumer consumer = kafkaUtils.createNewConsumerByTopic(healthCheckTopic);
1058+
1059+
boolean healthCheckTopicExist = existTopic(healthCheckTopic);
1060+
log.info("HealthCheckTopic:" + healthCheckTopic + " existed:" + healthCheckTopicExist);
1061+
if (!healthCheckTopicExist) {
1062+
healthCheckResult.setStatus("unknown");
1063+
healthCheckResult.setMsg("HealthCheckTopic: " + healthCheckTopic + " Non-Exist. Please create it before doing health check.");
1064+
return healthCheckResult;
1065+
}
1066+
1067+
String message = "healthcheck_" + System.currentTimeMillis();
1068+
ProducerRecord<String, String> record = new ProducerRecord(healthCheckTopic, null, message);
1069+
log.info("Generate message:" + message);
1070+
try {
1071+
RecordMetadata recordMetadata = (RecordMetadata) producer.send(record).get();
1072+
log.info("Message:" + message + " has been sent to Partition:" + recordMetadata.partition());
1073+
} catch (Exception e){
1074+
healthCheckResult.setStatus("error");
1075+
healthCheckResult.setMsg("Health Check: Produce Message Failure. Exception: " + e.getMessage());
1076+
log.error("Health Check: Produce Message Failure.", e);
1077+
return healthCheckResult;
1078+
} finally {
1079+
producer.close();
1080+
}
1081+
1082+
int retries = 30;
1083+
int noRecordsCount = 0;
1084+
while (true) {
1085+
final ConsumerRecords<Long, String> consumerRecords =
1086+
consumer.poll(1000);
1087+
if (consumerRecords.count() == 0) {
1088+
noRecordsCount++;
1089+
if (noRecordsCount > retries) break;
1090+
else continue;
1091+
}
1092+
Iterator<ConsumerRecord<Long, String>> iterator = consumerRecords.iterator();
1093+
while(iterator.hasNext()) {
1094+
ConsumerRecord msg = iterator.next();
1095+
log.info("Health Check: Fetch Message " + msg.value() + ", offset:" + msg.offset());
1096+
if(msg.value().equals(message)) {
1097+
healthCheckResult.setStatus("ok");
1098+
healthCheckResult.setMsg(message);
1099+
return healthCheckResult;
1100+
}
1101+
}
1102+
consumer.commitAsync();
1103+
}
1104+
consumer.close();
1105+
1106+
if(healthCheckResult.getStatus() == null) {
1107+
healthCheckResult.setStatus("error");
1108+
healthCheckResult.setMsg("Health Check: Consume Message Failure. Consumer can't fetch the message.");
1109+
}
1110+
return healthCheckResult;
1111+
}
10491112
}

src/main/java/org/gnuhpc/bigdata/utils/KafkaUtils.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.kafka.clients.consumer.ConsumerConfig;
1010
import org.apache.kafka.clients.consumer.KafkaConsumer;
1111
import org.apache.kafka.clients.producer.KafkaProducer;
12+
import org.apache.kafka.clients.producer.ProducerConfig;
1213
import org.apache.kafka.common.Node;
1314
import org.apache.kafka.common.PartitionInfo;
1415
import org.apache.kafka.common.errors.ApiException;
@@ -20,6 +21,7 @@
2021
import org.springframework.beans.factory.annotation.Autowired;
2122
import org.springframework.context.annotation.Configuration;
2223

24+
import java.util.Collections;
2325
import java.util.List;
2426
import java.util.Properties;
2527

@@ -45,10 +47,10 @@ public class KafkaUtils {
4547

4648
public void init(){
4749
prop = new Properties();
48-
prop.setProperty("bootstrap.servers",kafkaConfig.getBrokers());
49-
prop.setProperty("key.serializer",
50+
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBrokers());
51+
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
5052
"org.apache.kafka.common.serialization.StringSerializer");
51-
prop.setProperty("value.serializer",
53+
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
5254
"org.apache.kafka.common.serialization.StringSerializer");
5355
producer = new KafkaProducer(prop);
5456
log.info("Kafka initing...");
@@ -80,6 +82,34 @@ public KafkaConsumer createNewConsumer(String consumerGroup) {
8082
return new KafkaConsumer(properties);
8183
}
8284

85+
public KafkaConsumer createNewConsumerByTopic(String topic){
86+
Properties properties = new Properties();
87+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getKafkaConfig().getBrokers());
88+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, DEFAULTCP);
89+
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
90+
StringDeserializer.class.getCanonicalName());
91+
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
92+
StringDeserializer.class.getCanonicalName());
93+
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
94+
kafkaConsumer.subscribe(Collections.singletonList(topic));
95+
96+
return kafkaConsumer;
97+
}
98+
99+
public KafkaProducer createProducer() {
100+
Properties prop = new Properties();
101+
prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBrokers());
102+
prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
103+
"org.apache.kafka.common.serialization.StringSerializer");
104+
prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
105+
"org.apache.kafka.common.serialization.StringSerializer");
106+
prop.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
107+
prop.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
108+
producer = new KafkaProducer(prop);
109+
110+
return producer;
111+
}
112+
83113
public Node getLeader(String topic, int partitionId) {
84114
KafkaConsumer consumer = createNewConsumer(DEFAULTCP);
85115
List<PartitionInfo> tmList = consumer.partitionsFor(topic);

src/main/resources/application-dev.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ kafka:
44
topic: "__consumer_offsets"
55
partitions: 50
66
reset: true
7+
healthcheck:
8+
topic: "health"
79

810
zookeeper:
911
uris: DPFTMP06:2181,DPFTMP07:2181,DPFTMP08:2181,DPFTMP09:2181,DPFTMP10:2181

src/main/resources/application-tina.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ kafka:
44
topic: "__consumer_offsets"
55
partitions: 50
66
reset: true
7+
healthcheck:
8+
topic: "health"
79

810
zookeeper:
911
uris: localhost:2181

0 commit comments

Comments
 (0)