Skip to content

Commit 446b040

Browse files
committed
GH-3741: Fix metric tag to show underlying exception type
Fixes: #3741 Issue: #3741 When exceptions occur in Kafka listeners, the metrics currently show `ListenerExecutionFailedException` in both the `error` tag (when using observation) and `exception` tag (when using micrometer without observation), rather than the actual underlying exception. * Modify ListenerContainer to pass actual exception to failure metrics * Update MessagingMessageListenerAdapter to report cause to observation * Add MicrometerMetricsTests to verify both observation and non-observation metrics * Fix ObservationTests to verify correct error reporting in metrics This ensures metrics show the actual underlying exception while maintaining existing span behavior. Signed-off-by: Soby Chacko <[email protected]> **Auto-cherry-pick to `3.3.x` & `3.2.x`**
1 parent 59c7f97 commit 446b040

File tree

4 files changed

+229
-9
lines changed

4 files changed

+229
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2357,13 +2357,18 @@ private void successTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?
23572357
}
23582358
}
23592359

2360-
private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record) {
2360+
private void failureTimer(@Nullable Object sample, @Nullable ConsumerRecord<?, ?> record,
2361+
Throwable exception) {
23612362
if (sample != null) {
2363+
String exceptionName = exception.getCause() != null
2364+
? exception.getCause().getClass().getSimpleName()
2365+
: exception.getClass().getSimpleName();
2366+
23622367
if (this.micrometerTagsProvider == null || record == null) {
2363-
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException");
2368+
this.micrometerHolder.failure(sample, exceptionName);
23642369
}
23652370
else {
2366-
this.micrometerHolder.failure(sample, "ListenerExecutionFailedException", record);
2371+
this.micrometerHolder.failure(sample, exceptionName, record);
23672372
}
23682373
}
23692374
}
@@ -2441,7 +2446,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24412446
}
24422447
catch (RuntimeException e) {
24432448
this.batchFailed = true;
2444-
failureTimer(sample, null);
2449+
failureTimer(sample, null, e);
24452450
batchInterceptAfter(records, e);
24462451
throw e;
24472452
}
@@ -2776,7 +2781,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27762781
recordInterceptAfter(cRecord, null);
27772782
}
27782783
catch (RuntimeException e) {
2779-
failureTimer(sample, cRecord);
2784+
failureTimer(sample, cRecord, e);
27802785
recordInterceptAfter(cRecord, e);
27812786
if (!isListenerAdapterObservationAware()) {
27822787
observation.error(e);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
425425
}
426426
catch (ListenerExecutionFailedException e) {
427427
listenerError = e;
428-
currentObservation.error(e);
428+
currentObservation.error(e.getCause() != null ? e.getCause() : e);
429429
handleException(records, acknowledgment, consumer, message, e);
430430
}
431431
catch (Error e) {
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.micrometer;
18+
19+
import java.util.Map;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import io.micrometer.core.instrument.MeterRegistry;
24+
import io.micrometer.core.instrument.Timer;
25+
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
26+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
27+
import io.micrometer.observation.ObservationRegistry;
28+
import io.micrometer.tracing.test.simple.SimpleTracer;
29+
import org.apache.kafka.clients.admin.AdminClientConfig;
30+
import org.apache.kafka.clients.consumer.ConsumerRecord;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.annotation.EnableKafka;
37+
import org.springframework.kafka.annotation.KafkaListener;
38+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39+
import org.springframework.kafka.core.ConsumerFactory;
40+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42+
import org.springframework.kafka.core.KafkaAdmin;
43+
import org.springframework.kafka.core.KafkaTemplate;
44+
import org.springframework.kafka.core.ProducerFactory;
45+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
46+
import org.springframework.kafka.test.context.EmbeddedKafka;
47+
import org.springframework.kafka.test.utils.KafkaTestUtils;
48+
import org.springframework.test.annotation.DirtiesContext;
49+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
50+
51+
import static org.assertj.core.api.Assertions.assertThat;
52+
53+
/**
54+
* @author Soby Chacko
55+
* @since 3.3.3
56+
*/
57+
@SpringJUnitConfig
58+
@EmbeddedKafka(topics = { MicrometerMetricsTests.METRICS_TEST_TOPIC }, partitions = 1)
59+
@DirtiesContext
60+
public class MicrometerMetricsTests {
61+
62+
public final static String METRICS_TEST_TOPIC = "metrics.test.topic";
63+
64+
@Test
65+
void verifyMetricsWithoutObservation(@Autowired MetricsListener listener,
66+
@Autowired MeterRegistry meterRegistry,
67+
@Autowired KafkaTemplate<Integer, String> template)
68+
throws Exception {
69+
70+
template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS);
71+
assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue();
72+
73+
Timer timer = meterRegistry.find("spring.kafka.listener")
74+
.tags("name", "metricsTest-0")
75+
.tag("result", "failure")
76+
.timer();
77+
78+
assertThat(timer).isNotNull();
79+
assertThat(timer.getId().getTag("exception"))
80+
.isEqualTo("IllegalStateException");
81+
}
82+
83+
@Test
84+
void verifyMetricsWithObservation(@Autowired ObservationListener observationListener,
85+
@Autowired MeterRegistry meterRegistry,
86+
@Autowired KafkaTemplate<Integer, String> template)
87+
throws Exception {
88+
89+
template.send(METRICS_TEST_TOPIC, "test").get(10, TimeUnit.SECONDS);
90+
assertThat(observationListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
91+
92+
Timer timer = meterRegistry.find("spring.kafka.listener")
93+
.tag("spring.kafka.listener.id", "observationTest-0")
94+
.tag("error", "IllegalStateException")
95+
.timer();
96+
97+
assertThat(timer).isNotNull();
98+
}
99+
100+
@Configuration
101+
@EnableKafka
102+
static class Config {
103+
104+
@Bean
105+
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
106+
return new KafkaAdmin(Map.of(
107+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
108+
broker.getBrokersAsString()));
109+
}
110+
111+
@Bean
112+
ProducerFactory<Integer, String> producerFactory(EmbeddedKafkaBroker broker) {
113+
return new DefaultKafkaProducerFactory<>(
114+
KafkaTestUtils.producerProps(broker));
115+
}
116+
117+
@Bean
118+
ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
119+
return new DefaultKafkaConsumerFactory<>(
120+
KafkaTestUtils.consumerProps("metrics", "false", broker));
121+
}
122+
123+
@Bean
124+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
125+
return new KafkaTemplate<>(pf);
126+
}
127+
128+
@Bean
129+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
130+
ConsumerFactory<Integer, String> cf) {
131+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
132+
new ConcurrentKafkaListenerContainerFactory<>();
133+
factory.setConsumerFactory(cf);
134+
factory.getContainerProperties().setMicrometerEnabled(true);
135+
factory.getContainerProperties().setObservationEnabled(false);
136+
return factory;
137+
}
138+
139+
@Bean
140+
ConcurrentKafkaListenerContainerFactory<Integer, String> observationListenerContainerFactory(
141+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
142+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
143+
new ConcurrentKafkaListenerContainerFactory<>();
144+
factory.setConsumerFactory(cf);
145+
factory.getContainerProperties().setObservationEnabled(true);
146+
factory.getContainerProperties().setObservationRegistry(observationRegistry);
147+
return factory;
148+
}
149+
150+
@Bean
151+
MetricsListener metricsListener() {
152+
return new MetricsListener();
153+
}
154+
155+
@Bean
156+
MeterRegistry meterRegistry() {
157+
return new SimpleMeterRegistry();
158+
}
159+
160+
@Bean
161+
ObservationListener observationListener() {
162+
return new ObservationListener();
163+
}
164+
165+
@Bean
166+
SimpleTracer simpleTracer() {
167+
return new SimpleTracer();
168+
}
169+
170+
@Bean
171+
ObservationRegistry observationRegistry(MeterRegistry meterRegistry) {
172+
ObservationRegistry observationRegistry = ObservationRegistry.create();
173+
observationRegistry.observationConfig()
174+
.observationHandler(new DefaultMeterObservationHandler(meterRegistry));
175+
return observationRegistry;
176+
}
177+
}
178+
179+
static class MetricsListener {
180+
final CountDownLatch latch = new CountDownLatch(1);
181+
182+
@KafkaListener(id = "metricsTest", topics = METRICS_TEST_TOPIC)
183+
void listen(ConsumerRecord<Integer, String> in) {
184+
try {
185+
throw new IllegalStateException("metrics test exception");
186+
}
187+
finally {
188+
latch.countDown();
189+
}
190+
}
191+
}
192+
193+
static class ObservationListener {
194+
final CountDownLatch latch = new CountDownLatch(1);
195+
196+
@KafkaListener(id = "observationTest",
197+
topics = METRICS_TEST_TOPIC,
198+
containerFactory = "observationListenerContainerFactory")
199+
void listen(ConsumerRecord<Integer, String> in) {
200+
try {
201+
throw new IllegalStateException("observation test exception");
202+
}
203+
finally {
204+
latch.countDown();
205+
}
206+
}
207+
}
208+
209+
}
210+

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,9 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri
359359
@Test
360360
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
361361
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
362-
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config)
363-
throws ExecutionException, InterruptedException, TimeoutException {
362+
@Autowired KafkaListenerEndpointRegistry endpointRegistry,
363+
@Autowired MeterRegistry meterRegistry, @Autowired Config config)
364+
throws ExecutionException, InterruptedException, TimeoutException {
364365

365366
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
366367
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
@@ -372,10 +373,14 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
372373
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
373374
span = spans.poll();
374375
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
375-
assertThat(span.getError().getCause())
376+
assertThat(span.getError())
376377
.isInstanceOf(IllegalStateException.class)
377378
.hasMessage("obs4 run time exception");
378379

380+
assertThat(meterRegistry.get("spring.kafka.listener")
381+
.tag("error", "IllegalStateException")
382+
.timer().count()).isEqualTo(1);
383+
379384
assertThat(config.scopeInFailureReference.get()).isNotNull();
380385
}
381386

0 commit comments

Comments
 (0)