|
1 | 1 | # Library Instrumentation for Spring Kafka version 2.7 and higher |
2 | 2 |
|
3 | 3 | Provides OpenTelemetry instrumentation for [Spring Kafka](https://spring.io/projects/spring-kafka), |
4 | | -enabling consumer messaging spans for Spring Kafka listeners |
| 4 | +enabling consumer and producer messaging spans. |
5 | 5 |
|
6 | 6 | ## Quickstart |
7 | 7 |
|
@@ -30,26 +30,38 @@ implementation("io.opentelemetry.instrumentation:opentelemetry-spring-kafka-2.7: |
30 | 30 | ### Usage |
31 | 31 |
|
32 | 32 | The instrumentation library provides interceptors that can be added to Spring Kafka message |
33 | | -listener containers to provide OpenTelemetry-based spans and context propagation. |
| 33 | +listener containers and producers to provide spans and context propagation. |
34 | 34 |
|
35 | 35 | ```java |
36 | 36 | import io.opentelemetry.api.OpenTelemetry; |
| 37 | +import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry; |
37 | 38 | import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; |
| 39 | +import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; |
| 40 | +import org.springframework.context.annotation.Bean; |
| 41 | +import org.springframework.context.annotation.Configuration; |
38 | 42 | import org.springframework.kafka.config.ContainerCustomizer; |
39 | 43 | import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; |
40 | 44 |
|
41 | | -public class SpringKafkaConfiguration { |
| 45 | +@Configuration |
| 46 | +public class KafkaInstrumentationConfig { |
42 | 47 |
|
43 | | - // Use this ContainerCustomizer to add interceptors to your Kafka listener containers. |
| 48 | + // Instrument Kafka producers |
| 49 | + @Bean |
| 50 | + public DefaultKafkaProducerFactoryCustomizer producerInstrumentation( |
| 51 | + OpenTelemetry openTelemetry) { |
| 52 | + KafkaTelemetry kafkaTelemetry = KafkaTelemetry.create(openTelemetry); |
| 53 | + return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap); |
| 54 | + } |
| 55 | + |
| 56 | + // Instrument Kafka consumers |
| 57 | + @Bean |
44 | 58 | public ContainerCustomizer<String, String, ConcurrentMessageListenerContainer<String, String>> |
45 | | - createListenerCustomizer(OpenTelemetry openTelemetry) { |
46 | | - SpringKafkaTelemetry telemetry = SpringKafkaTelemetry.builder(openTelemetry).build(); |
| 59 | + listenerCustomizer(OpenTelemetry openTelemetry) { |
| 60 | + SpringKafkaTelemetry springKafkaTelemetry = SpringKafkaTelemetry.create(openTelemetry); |
47 | 61 | return container -> { |
48 | | - container.setRecordInterceptor(telemetry.createRecordInterceptor()); |
49 | | - container.setBatchInterceptor(telemetry.createBatchInterceptor()); |
| 62 | + container.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor()); |
| 63 | + container.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor()); |
50 | 64 | }; |
51 | 65 | } |
52 | | - |
53 | | - // Configure the customizer in your Spring Kafka configuration. |
54 | 66 | } |
55 | 67 | ``` |
0 commit comments