|
5 | 5 |
|
6 | 6 | package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka; |
7 | 7 |
|
8 | | -import io.opentelemetry.api.OpenTelemetry; |
9 | 8 | import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; |
10 | | -import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; |
11 | | -import org.springframework.beans.factory.ObjectProvider; |
| 9 | +import java.lang.reflect.Field; |
| 10 | +import java.util.function.Supplier; |
12 | 11 | import org.springframework.beans.factory.config.BeanPostProcessor; |
| 12 | +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; |
13 | 13 | import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; |
| 14 | +import org.springframework.kafka.listener.BatchInterceptor; |
| 15 | +import org.springframework.kafka.listener.RecordInterceptor; |
14 | 16 |
|
15 | 17 | class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor { |
16 | 18 |
|
17 | | - private final ObjectProvider<OpenTelemetry> openTelemetryProvider; |
18 | | - private final ObjectProvider<ConfigProperties> configPropertiesProvider; |
| 19 | + private final Supplier<SpringKafkaTelemetry> springKafkaTelemetry; |
19 | 20 |
|
20 | 21 | ConcurrentKafkaListenerContainerFactoryPostProcessor( |
21 | | - ObjectProvider<OpenTelemetry> openTelemetryProvider, |
22 | | - ObjectProvider<ConfigProperties> configPropertiesProvider) { |
23 | | - this.openTelemetryProvider = openTelemetryProvider; |
24 | | - this.configPropertiesProvider = configPropertiesProvider; |
| 22 | + Supplier<SpringKafkaTelemetry> springKafkaTelemetry) { |
| 23 | + this.springKafkaTelemetry = springKafkaTelemetry; |
25 | 24 | } |
26 | 25 |
|
| 26 | + @SuppressWarnings("unchecked") |
27 | 27 | @Override |
28 | 28 | public Object postProcessAfterInitialization(Object bean, String beanName) { |
29 | 29 | if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) { |
30 | 30 | return bean; |
31 | 31 | } |
32 | 32 |
|
33 | | - ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory = |
34 | | - (ConcurrentKafkaListenerContainerFactory<?, ?>) bean; |
35 | | - SpringKafkaTelemetry springKafkaTelemetry = |
36 | | - SpringKafkaTelemetry.builder(openTelemetryProvider.getObject()) |
37 | | - .setCaptureExperimentalSpanAttributes( |
38 | | - configPropertiesProvider |
39 | | - .getObject() |
40 | | - .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) |
41 | | - .build(); |
42 | | - listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor()); |
43 | | - listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor()); |
| 33 | + ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory = |
| 34 | + (ConcurrentKafkaListenerContainerFactory<Object, Object>) bean; |
| 35 | + SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get(); |
| 36 | + |
| 37 | + // use reflection to read existing values to avoid overwriting user configured interceptors |
| 38 | + BatchInterceptor<Object, Object> batchInterceptor = |
| 39 | + readField(listenerContainerFactory, "batchInterceptor", BatchInterceptor.class); |
| 40 | + RecordInterceptor<Object, Object> recordInterceptor = |
| 41 | + readField(listenerContainerFactory, "recordInterceptor", RecordInterceptor.class); |
| 42 | + listenerContainerFactory.setBatchInterceptor( |
| 43 | + springKafkaTelemetry.createBatchInterceptor(batchInterceptor)); |
| 44 | + listenerContainerFactory.setRecordInterceptor( |
| 45 | + springKafkaTelemetry.createRecordInterceptor(recordInterceptor)); |
44 | 46 |
|
45 | 47 | return listenerContainerFactory; |
46 | 48 | } |
| 49 | + |
| 50 | + private static <T> T readField(Object container, String filedName, Class<T> fieldType) { |
| 51 | + try { |
| 52 | + Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField(filedName); |
| 53 | + field.setAccessible(true); |
| 54 | + return fieldType.cast(field.get(container)); |
| 55 | + } catch (Exception exception) { |
| 56 | + return null; |
| 57 | + } |
| 58 | + } |
47 | 59 | } |
0 commit comments