Skip to content

Commit 6e03b47

Browse files
authored
GH-3686: Fix observation scope closure in the KafkaMLContainer (#3689)
Fixes: #3686 According to our investigation around the `try-with-resource`, it looks like the resource is already closed when we reach the `catch` block. * Rework `KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener()` to `observation.openScope()` before the `try` and close it manually in the `finally` block * Verify `RecordInterceptor.failure()` has a scope in the `ObservationTests`
1 parent 4716ce1 commit 6e03b47

File tree

2 files changed

+32
-5
lines changed

2 files changed

+32
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2750,7 +2750,6 @@ private void pauseForNackSleep() {
27502750
* @throws Error an error.
27512751
*/
27522752
@Nullable
2753-
@SuppressWarnings("try")
27542753
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
27552754
Iterator<ConsumerRecord<K, V>> iterator) {
27562755

@@ -2763,7 +2762,9 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27632762
this.observationRegistry);
27642763

27652764
observation.start();
2766-
try (Observation.Scope ignored = observation.openScope()) {
2765+
Observation.Scope observationScope = observation.openScope();
2766+
// We cannot use 'try-with-resource' because the resource is closed just before catch block
2767+
try {
27672768
invokeOnMessage(cRecord);
27682769
successTimer(sample, cRecord);
27692770
recordInterceptAfter(cRecord, null);
@@ -2802,6 +2803,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
28022803
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
28032804
observation.stop();
28042805
}
2806+
observationScope.close();
28052807
}
28062808
return null;
28072809
}
@@ -4020,6 +4022,6 @@ private static class StopAfterFenceException extends KafkaException {
40204022

40214023
}
40224024

4023-
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { };
4025+
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
40244026

40254027
}

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
3333
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
3434
import io.micrometer.core.tck.MeterRegistryAssert;
35+
import io.micrometer.observation.Observation;
3536
import io.micrometer.observation.ObservationHandler;
3637
import io.micrometer.observation.ObservationRegistry;
3738
import io.micrometer.observation.tck.TestObservationRegistry;
@@ -45,6 +46,7 @@
4546
import io.micrometer.tracing.test.simple.SimpleSpan;
4647
import io.micrometer.tracing.test.simple.SimpleTracer;
4748
import org.apache.kafka.clients.admin.AdminClientConfig;
49+
import org.apache.kafka.clients.consumer.Consumer;
4850
import org.apache.kafka.clients.consumer.ConsumerConfig;
4951
import org.apache.kafka.clients.consumer.ConsumerRecord;
5052
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -74,6 +76,7 @@
7476
import org.springframework.kafka.core.KafkaTemplate;
7577
import org.springframework.kafka.core.ProducerFactory;
7678
import org.springframework.kafka.listener.MessageListenerContainer;
79+
import org.springframework.kafka.listener.RecordInterceptor;
7780
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
7881
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
7982
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -356,7 +359,7 @@ private void assertThatAdmin(Object object, KafkaAdmin admin, String brokersStri
356359
@Test
357360
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
358361
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
359-
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
362+
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired Config config)
360363
throws ExecutionException, InterruptedException, TimeoutException {
361364

362365
runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
@@ -372,6 +375,8 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
372375
assertThat(span.getError().getCause())
373376
.isInstanceOf(IllegalStateException.class)
374377
.hasMessage("obs4 run time exception");
378+
379+
assertThat(config.scopeInFailureReference.get()).isNotNull();
375380
}
376381

377382
@Test
@@ -445,6 +450,8 @@ public static class Config {
445450

446451
KafkaAdmin mockAdmin = mock(KafkaAdmin.class);
447452

453+
AtomicReference<Observation.Scope> scopeInFailureReference = new AtomicReference<>();
454+
448455
@Bean
449456
KafkaAdmin admin(EmbeddedKafkaBroker broker) {
450457
String[] brokers = StringUtils.commaDelimitedListToStringArray(broker.getBrokersAsString());
@@ -512,7 +519,7 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
512519

513520
@Bean
514521
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
515-
ConsumerFactory<Integer, String> cf) {
522+
ConsumerFactory<Integer, String> cf, ObservationRegistry observationRegistry) {
516523

517524
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
518525
new ConcurrentKafkaListenerContainerFactory<>();
@@ -522,6 +529,24 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
522529
if (container.getListenerId().equals("obs3")) {
523530
container.setKafkaAdmin(this.mockAdmin);
524531
}
532+
if (container.getListenerId().equals("obs4")) {
533+
container.setRecordInterceptor(new RecordInterceptor<>() {
534+
535+
@Override
536+
public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String> record,
537+
Consumer<Integer, String> consumer) {
538+
539+
return record;
540+
}
541+
542+
@Override
543+
public void failure(ConsumerRecord<Integer, String> record, Exception exception,
544+
Consumer<Integer, String> consumer) {
545+
546+
Config.this.scopeInFailureReference.set(observationRegistry.getCurrentObservationScope());
547+
}
548+
});
549+
}
525550
});
526551
return factory;
527552
}

0 commit comments

Comments
 (0)