diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index de258bac48..09947227b7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2750,7 +2750,6 @@ private void pauseForNackSleep() { * @throws Error an error. */ @Nullable - @SuppressWarnings("try") private RuntimeException doInvokeRecordListener(final ConsumerRecord cRecord, // NOSONAR Iterator> iterator) { @@ -2763,7 +2762,9 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco this.observationRegistry); observation.start(); - try (Observation.Scope ignored = observation.openScope()) { + Observation.Scope observationScope = observation.openScope(); + // We cannot use 'try-with-resource' because the resource is closed just before catch block + try { invokeOnMessage(cRecord); successTimer(sample, cRecord); recordInterceptAfter(cRecord, null); @@ -2802,6 +2803,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco if (!(this.listener instanceof RecordMessagingMessageListenerAdapter)) { observation.stop(); } + observationScope.close(); } return null; } @@ -4020,6 +4022,6 @@ private static class StopAfterFenceException extends KafkaException { } - private record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { }; + private record FailedRecordTuple(ConsumerRecord record, RuntimeException ex) { } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 0a48bf63fd..a020bf0363 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -32,6 +32,7 @@ import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.core.tck.MeterRegistryAssert; +import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationHandler; import io.micrometer.observation.ObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistry; @@ -45,6 +46,7 @@ import io.micrometer.tracing.test.simple.SimpleSpan; import io.micrometer.tracing.test.simple.SimpleTracer; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; @@ -74,6 +76,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.RecordInterceptor; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -356,7 +359,7 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri @Test void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer, @Autowired @Qualifier("throwableTemplate") KafkaTemplate runtimeExceptionTemplate, - @Autowired KafkaListenerEndpointRegistry endpointRegistry) + @Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config) throws ExecutionException, InterruptedException, TimeoutException { runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS); @@ -372,6 +375,8 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir assertThat(span.getError().getCause()) .isInstanceOf(IllegalStateException.class) .hasMessage("obs4 run time exception"); + + assertThat(config.scopeInFailureReference.get()).isNotNull(); } @Test @@ -445,6 +450,8 @@ public static class Config { KafkaAdmin mockAdmin = mock(KafkaAdmin.class); + AtomicReference scopeInFailureReference = new AtomicReference<>(); + @Bean KafkaAdmin admin(EmbeddedKafkaBroker broker) { String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString()); @@ -512,7 +519,7 @@ KafkaTemplate reuseAdminBeanKafkaTemplate( @Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConsumerFactory cf) { + ConsumerFactory cf, ObservationRegistry observationRegistry) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); @@ -522,6 +529,24 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerF if (container.getListenerId().equals("obs3")) { container.setKafkaAdmin(this.mockAdmin); } + if (container.getListenerId().equals("obs4")) { + container.setRecordInterceptor(new RecordInterceptor<>() { + + @Override + public ConsumerRecord intercept(ConsumerRecord record, + Consumer consumer) { + + return record; + } + + @Override + public void failure(ConsumerRecord record, Exception exception, + Consumer consumer) { + + Config.this.scopeInFailureReference.set(observationRegistry.getCurrentObservationScope()); + } + }); + } }); return factory; }