Skip to content

Commit bb7282d

Browse files
authored
BAEL-6136: micrometer spring kafka (#18473)
* BAEL-6136: micrometer + spring kafka * BAEL-6136: repackage * BAEL-6136: add curl example * BAEL-6136: add tracing
1 parent a467e30 commit bb7282d

File tree

11 files changed

+194
-4
lines changed

11 files changed

+194
-4
lines changed

spring-kafka-4/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
<groupId>org.springframework.kafka</groupId>
2828
<artifactId>spring-kafka</artifactId>
2929
</dependency>
30+
<dependency>
31+
<groupId>org.springframework.boot</groupId>
32+
<artifactId>spring-boot-starter-actuator</artifactId>
33+
</dependency>
3034

3135
<!-- test dependencies -->
3236
<dependency>
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.baeldung.kafka.monitoring;
2+
3+
public record ArticleCommentAddedEvent(String articleSlug, String articleAuthor, String comment, String commentAuthor) {
4+
5+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.baeldung.kafka.monitoring;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.messaging.handler.annotation.Header;
7+
import org.springframework.stereotype.Component;
8+
9+
@Component
10+
public class ArticleCommentsListener {
11+
12+
private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class);
13+
14+
@KafkaListener(
15+
topics = "baeldung.article-comment.added",
16+
containerFactory = "customKafkaListenerContainerFactory"
17+
)
18+
public void onArticleComment(ArticleCommentAddedEvent event, @Header("traceparent") String traceParent) {
19+
log.info("Kafka Message Received: Comment Added: " + event);
20+
// some logic here...
21+
}
22+
23+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.baeldung.kafka.monitoring;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
import org.springframework.beans.factory.annotation.Qualifier;
6+
import org.springframework.kafka.core.KafkaTemplate;
7+
import org.springframework.web.bind.annotation.PathVariable;
8+
import org.springframework.web.bind.annotation.PostMapping;
9+
import org.springframework.web.bind.annotation.RequestBody;
10+
import org.springframework.web.bind.annotation.RequestMapping;
11+
import org.springframework.web.bind.annotation.RestController;
12+
13+
@RestController
14+
@RequestMapping("/api")
15+
public class ArticleCommentsRestController {
16+
17+
private static final Logger log = LoggerFactory.getLogger(ArticleCommentsRestController.class);
18+
19+
private final KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate;
20+
21+
public ArticleCommentsRestController(
22+
@Qualifier("articleCommentsKafkaTemplate") KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate) {
23+
this.articleCommentsKafkaTemplate = articleCommentsKafkaTemplate;
24+
}
25+
26+
@PostMapping("/articles/{articleSlug}/comments")
27+
Response addArticleComment(
28+
@PathVariable("articleSlug") String articleSlug,
29+
@RequestBody ArticleCommentAddedDto dto
30+
) {
31+
32+
log.info("HTTP Request received to save article comment: " + dto);
33+
// some logic here (eg: save to DB)
34+
35+
var event = new ArticleCommentAddedEvent(articleSlug, dto.articleAuthor(), dto.comment(), dto.commentAuthor());
36+
articleCommentsKafkaTemplate.send("baeldung.article-comment.added", articleSlug, event);
37+
38+
return new Response("Success", articleSlug);
39+
}
40+
41+
record Response(String status, String articleSlug) {
42+
43+
}
44+
45+
record ArticleCommentAddedDto(String articleAuthor, String comment, String commentAuthor) {
46+
47+
}
48+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.baeldung.kafka.monitoring;
2+
3+
import static java.util.Collections.singletonList;
4+
5+
import java.util.Map;
6+
7+
import org.springframework.beans.factory.annotation.Qualifier;
8+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12+
import org.springframework.kafka.core.ConsumerFactory;
13+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
15+
import org.springframework.kafka.core.KafkaTemplate;
16+
import org.springframework.kafka.core.MicrometerConsumerListener;
17+
import org.springframework.kafka.core.MicrometerProducerListener;
18+
import org.springframework.kafka.core.ProducerFactory;
19+
import org.springframework.kafka.listener.ContainerProperties;
20+
21+
import io.micrometer.core.instrument.ImmutableTag;
22+
import io.micrometer.core.instrument.MeterRegistry;
23+
24+
@Configuration
25+
class KafkaConfig {
26+
27+
@Bean
28+
ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
29+
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
30+
cf.addListener(new MicrometerConsumerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app"))));
31+
return cf;
32+
}
33+
34+
@Bean
35+
ProducerFactory<String, ArticleCommentAddedEvent> producerFactory(KafkaProperties kafkaProperties, MeterRegistry meterRegistry) {
36+
ProducerFactory<String, ArticleCommentAddedEvent> pf = new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
37+
pf.addListener(new MicrometerProducerListener<>(meterRegistry, singletonList(new ImmutableTag("app-name", "article-comments-app"))));
38+
return pf;
39+
}
40+
41+
@Bean
42+
@Qualifier("articleCommentsKafkaTemplate")
43+
KafkaTemplate<String, ArticleCommentAddedEvent> articleCommentsKafkaTemplate(ProducerFactory<String, ArticleCommentAddedEvent> producerFactory) {
44+
var template = new KafkaTemplate<>(producerFactory);
45+
46+
template.setObservationEnabled(true);
47+
template.setMicrometerTags(Map.of("topic", "baeldung.article-comment.added"));
48+
template.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key()
49+
.toString()));
50+
51+
return template;
52+
}
53+
54+
@Bean
55+
ConcurrentKafkaListenerContainerFactory<String, String> customKafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
56+
57+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
58+
factory.setConsumerFactory(consumerFactory);
59+
60+
ContainerProperties containerProps = factory.getContainerProperties();
61+
containerProps.setObservationEnabled(true);
62+
containerProps.setMicrometerTags(Map.of("app-name", "article-comments-app"));
63+
containerProps.setMicrometerTagsProvider(record -> Map.of("article-slug", record.key()
64+
.toString()));
65+
66+
return factory;
67+
}
68+
69+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.baeldung.kafka.monitoring;
2+
3+
import org.springframework.boot.autoconfigure.SpringBootApplication;
4+
import org.springframework.boot.builder.SpringApplicationBuilder;
5+
6+
@SpringBootApplication
7+
class KafkaMonitoringApplication {
8+
9+
public static void main(String[] args) {
10+
new SpringApplicationBuilder()
11+
.profiles("monitoring")
12+
.sources(KafkaMonitoringApplication.class)
13+
.run(args);
14+
}
15+
16+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
2+
management:
3+
endpoints.web.exposure.include: '*'
4+
endpoint.health.show-details: always
5+
6+
spring:
7+
application:
8+
name: kafka-monitoring
9+
kafka:
10+
bootstrap-servers: localhost:9092
11+
consumer:
12+
group-id: baeldung-app-1
13+
auto-offset-reset: earliest
14+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
15+
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
16+
properties:
17+
spring.json.value.default.type: com.baeldung.kafka.monitoring.ArticleCommentAddedEvent
18+
spring.json.trusted.packages: com.baeldung.kafka.monitoring
19+
20+
producer:
21+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
22+
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

spring-kafka-4/src/main/resources/application.properties

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
server.port: 8081
2+
spring.kafka.bootstrap-servers: localhost:9092
3+
long.message.topic.name: longMessage

spring-kafka-4/src/main/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
<configuration>
33
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
44
<encoder>
5-
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
5+
<pattern>%d{HH:mm:ss.SSS} [%thread] %X{traceId:-}-%X{spanId:-} %-5level %logger{36} - %msg%n
66
</pattern>
77
</encoder>
88
</appender>

0 commit comments

Comments
 (0)