Skip to content

Commit a75b3d0

Browse files
authored
GH-2467: Micrometer Observation for Streams
Resolves #2467 * Fix race in test. * Add tracing test; fix possible NPEs in stream contexts. * Change input dir for doc generation. * Strip package from new conventions in gen'd doc. * Docs; polishing; fix doc generation for duplicate enum values. * Use stream convention, regardless of native listener or not.
1 parent 974a4a9 commit a75b3d0

22 files changed

+1201
-221
lines changed

build.gradle

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,7 @@ project('spring-rabbit-stream') {
474474

475475
api project(':spring-rabbit')
476476
api "com.rabbitmq:stream-client:$rabbitmqStreamVersion"
477+
optionalApi 'io.micrometer:micrometer-core'
477478

478479
testApi project(':spring-rabbit-junit')
479480
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
@@ -488,6 +489,10 @@ project('spring-rabbit-stream') {
488489
testImplementation "org.testcontainers:junit-jupiter"
489490
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
490491
testImplementation 'org.springframework:spring-webflux'
492+
testImplementation 'io.micrometer:micrometer-observation-test'
493+
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
494+
testImplementation 'io.micrometer:micrometer-tracing-test'
495+
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
491496
}
492497

493498
}
@@ -547,7 +552,7 @@ task prepareAsciidocBuild(type: Sync) {
547552
into "$buildDir/asciidoc"
548553
}
549554

550-
def observationInputDir = file('spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/micrometer').absolutePath
555+
def observationInputDir = project.rootDir.absolutePath
551556
def generatedDocsDir = file("$buildDir/docs/generated").absolutePath
552557

553558
task generateObservabilityDocs(type: JavaExec) {
@@ -564,7 +569,7 @@ task filterMetricsDocsContent(type: Copy) {
564569
include '_*.adoc'
565570
into generatedDocsDir
566571
rename { filename -> filename.replace '_', '' }
567-
filter { line -> line.replaceAll('org.springframework.amqp.rabbit.support.micrometer.', '').replaceAll('^Fully qualified n', 'N') }
572+
filter { line -> line.replaceAll('org.springframework.*.micrometer.', '').replaceAll('^Fully qualified n', 'N') }
568573
}
569574

570575
asciidoctorPdf {

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
3232
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
3333
import org.springframework.rabbit.stream.listener.adapter.StreamMessageListenerAdapter;
34+
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservationConvention;
3435
import org.springframework.util.Assert;
3536

3637
import com.rabbitmq.stream.Environment;
@@ -53,6 +54,8 @@ public class StreamRabbitListenerContainerFactory
5354

5455
private ContainerCustomizer<StreamListenerContainer> containerCustomizer;
5556

57+
private RabbitStreamListenerObservationConvention streamListenerObservationConvention;
58+
5659
/**
5760
* Construct an instance using the provided environment.
5861
* @param environment the environment.
@@ -87,6 +90,18 @@ public void setContainerCustomizer(ContainerCustomizer<StreamListenerContainer>
8790
this.containerCustomizer = containerCustomizer;
8891
}
8992

93+
/**
94+
* Set a {@link RabbitStreamListenerObservationConvention} that is used when receiving
95+
* native stream messages.
96+
* @param streamListenerObservationConvention the convention.
97+
* @since 3.0.5
98+
*/
99+
public void setStreamListenerObservationConvention(
100+
RabbitStreamListenerObservationConvention streamListenerObservationConvention) {
101+
102+
this.streamListenerObservationConvention = streamListenerObservationConvention;
103+
}
104+
90105
@Override
91106
public StreamListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint) {
92107
if (endpoint instanceof MethodRabbitListenerEndpoint && this.nativeListener) {
@@ -101,8 +116,12 @@ public StreamListenerContainer createListenerContainer(RabbitListenerEndpoint en
101116
StreamListenerContainer container = createContainerInstance();
102117
Advice[] adviceChain = getAdviceChain();
103118
JavaUtils.INSTANCE
119+
.acceptIfNotNull(getApplicationContext(), container::setApplicationContext)
104120
.acceptIfNotNull(this.consumerCustomizer, container::setConsumerCustomizer)
105-
.acceptIfNotNull(adviceChain, container::setAdviceChain);
121+
.acceptIfNotNull(adviceChain, container::setAdviceChain)
122+
.acceptIfNotNull(getMicrometerEnabled(), container::setMicrometerEnabled)
123+
.acceptIfNotNull(getObservationEnabled(), container::setObservationEnabled)
124+
.acceptIfNotNull(this.streamListenerObservationConvention, container::setObservationConvention);
106125
applyCommonOverrides(endpoint, container);
107126
if (this.containerCustomizer != null) {
108127
this.containerCustomizer.configure(container);

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java

Lines changed: 89 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,13 +25,18 @@
2525

2626
import org.springframework.amqp.core.Message;
2727
import org.springframework.amqp.core.MessageListener;
28-
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
28+
import org.springframework.amqp.rabbit.listener.MicrometerHolder;
29+
import org.springframework.amqp.rabbit.listener.ObservableListenerContainer;
2930
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
31+
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
3032
import org.springframework.aop.framework.ProxyFactory;
3133
import org.springframework.aop.support.DefaultPointcutAdvisor;
32-
import org.springframework.beans.factory.BeanNameAware;
3334
import org.springframework.core.log.LogAccessor;
3435
import org.springframework.lang.Nullable;
36+
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservation;
37+
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservation.DefaultRabbitStreamListenerObservationConvention;
38+
import org.springframework.rabbit.stream.micrometer.RabbitStreamListenerObservationConvention;
39+
import org.springframework.rabbit.stream.micrometer.RabbitStreamMessageReceiverContext;
3540
import org.springframework.rabbit.stream.support.StreamMessageProperties;
3641
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
3742
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
@@ -41,6 +46,8 @@
4146
import com.rabbitmq.stream.Consumer;
4247
import com.rabbitmq.stream.ConsumerBuilder;
4348
import com.rabbitmq.stream.Environment;
49+
import io.micrometer.observation.Observation;
50+
import io.micrometer.observation.ObservationRegistry;
4451

4552
/**
4653
* A listener container for RabbitMQ Streams.
@@ -49,7 +56,7 @@
4956
* @since 2.4
5057
*
5158
*/
52-
public class StreamListenerContainer implements MessageListenerContainer, BeanNameAware {
59+
public class StreamListenerContainer extends ObservableListenerContainer {
5360

5461
protected LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
5562

@@ -67,10 +74,6 @@ public class StreamListenerContainer implements MessageListenerContainer, BeanNa
6774

6875
private int concurrency = 1;
6976

70-
private String listenerId;
71-
72-
private String beanName;
73-
7477
private boolean autoStartup = true;
7578

7679
private MessageListener messageListener;
@@ -79,6 +82,11 @@ public class StreamListenerContainer implements MessageListenerContainer, BeanNa
7982

8083
private Advice[] adviceChain;
8184

85+
private String streamName;
86+
87+
@Nullable
88+
private RabbitStreamListenerObservationConvention observationConvention;
89+
8290
/**
8391
* Construct an instance using the provided environment.
8492
* @param environment the environment.
@@ -108,6 +116,7 @@ public synchronized void setQueueNames(String... queueNames) {
108116
Assert.isTrue(queueNames != null && queueNames.length == 1, "Only one stream is supported");
109117
this.builder.stream(queueNames[0]);
110118
this.simpleStream = true;
119+
this.streamName = queueNames[0];
111120
}
112121

113122
/**
@@ -139,6 +148,7 @@ public synchronized void superStream(String streamName, String name, int consume
139148
.singleActiveConsumer()
140149
.name(name);
141150
this.superStream = true;
151+
this.streamName = streamName;
142152
}
143153

144154
/**
@@ -171,34 +181,6 @@ public synchronized void setConsumerCustomizer(ConsumerCustomizer consumerCustom
171181
this.consumerCustomizer = consumerCustomizer;
172182
}
173183

174-
/**
175-
* The 'id' attribute of the listener.
176-
* @return the id (or the container bean name if no id set).
177-
*/
178-
@Nullable
179-
public String getListenerId() {
180-
return this.listenerId != null ? this.listenerId : this.beanName;
181-
}
182-
183-
@Override
184-
public void setListenerId(String listenerId) {
185-
this.listenerId = listenerId;
186-
}
187-
188-
/**
189-
* Return the bean name.
190-
* @return the bean name.
191-
*/
192-
@Nullable
193-
public String getBeanName() {
194-
return this.beanName;
195-
}
196-
197-
@Override
198-
public void setBeanName(String beanName) {
199-
this.beanName = beanName;
200-
}
201-
202184
@Override
203185
public void setAutoStartup(boolean autoStart) {
204186
this.autoStartup = autoStart;
@@ -226,6 +208,22 @@ public Object getMessageListener() {
226208
return this.messageListener;
227209
}
228210

211+
/**
212+
* Set a RabbitStreamListenerObservationConvention; used to add additional key/values
213+
* to observations when using a {@link StreamMessageListener}.
214+
* @param observationConvention the convention.
215+
* @since 3.0.5
216+
*/
217+
public void setObservationConvention(RabbitStreamListenerObservationConvention observationConvention) {
218+
this.observationConvention = observationConvention;
219+
}
220+
221+
@Override
222+
public void afterPropertiesSet() {
223+
checkMicrometer();
224+
checkObservation();
225+
}
226+
229227
@Override
230228
public synchronized boolean isRunning() {
231229
return this.consumers.size() > 0;
@@ -263,21 +261,72 @@ public synchronized void stop() {
263261
public void setupMessageListener(MessageListener messageListener) {
264262
adviseIfNeeded(messageListener);
265263
this.builder.messageHandler((context, message) -> {
264+
ObservationRegistry registry = getObservationRegistry();
265+
Object sample = null;
266+
MicrometerHolder micrometerHolder = getMicrometerHolder();
267+
if (micrometerHolder != null) {
268+
sample = micrometerHolder.start();
269+
}
270+
Observation observation =
271+
RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention,
272+
DefaultRabbitStreamListenerObservationConvention.INSTANCE,
273+
() -> new RabbitStreamMessageReceiverContext(message, getListenerId(), this.streamName),
274+
registry);
275+
Object finalSample = sample;
266276
if (this.streamListener != null) {
267-
this.streamListener.onStreamMessage(message, context);
277+
observation.observe(() -> {
278+
try {
279+
this.streamListener.onStreamMessage(message, context);
280+
if (finalSample != null) {
281+
micrometerHolder.success(finalSample, this.streamName);
282+
}
283+
}
284+
catch (RuntimeException rtex) {
285+
if (finalSample != null) {
286+
micrometerHolder.failure(finalSample, this.streamName, rtex.getClass().getSimpleName());
287+
}
288+
throw rtex;
289+
}
290+
catch (Exception ex) {
291+
if (finalSample != null) {
292+
micrometerHolder.failure(finalSample, this.streamName, ex.getClass().getSimpleName());
293+
}
294+
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
295+
}
296+
});
268297
}
269298
else {
270299
Message message2 = this.streamConverter.toMessage(message, new StreamMessageProperties(context));
271300
if (this.messageListener instanceof ChannelAwareMessageListener) {
272301
try {
273-
((ChannelAwareMessageListener) this.messageListener).onMessage(message2, null);
302+
observation.observe(() -> {
303+
try {
304+
((ChannelAwareMessageListener) this.messageListener).onMessage(message2, null);
305+
if (finalSample != null) {
306+
micrometerHolder.success(finalSample, this.streamName);
307+
}
308+
}
309+
catch (RuntimeException rtex) {
310+
if (finalSample != null) {
311+
micrometerHolder.failure(finalSample, this.streamName,
312+
rtex.getClass().getSimpleName());
313+
}
314+
throw rtex;
315+
}
316+
catch (Exception ex) {
317+
if (finalSample != null) {
318+
micrometerHolder.failure(finalSample, this.streamName, ex.getClass().getSimpleName());
319+
}
320+
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
321+
}
322+
});
274323
}
275324
catch (Exception ex) { // NOSONAR
276-
this.logger.error(ex, "Listner threw an exception");
325+
this.logger.error(ex, "Listener threw an exception");
277326
}
278327
}
279328
else {
280-
this.messageListener.onMessage(message2);
329+
observation.observe(() -> this.messageListener.onMessage(message2));
281330
}
282331
}
283332
});

0 commit comments

Comments
 (0)