Skip to content

Commit 6c5ca00

Browse files
committed
GH-3177: Optimize Observation when ObservationRegistry.NOOP
Fixes: #3177 There is some performance overhead when we deal with an `Observation` even if `ObservationRegistry.NOOP` * Fix respective `Observation` API usage to skip its calls when `ObservationRegistry.NOOP` **Auto-cherry-pick to `3.2.x`**
1 parent 8a6a70c commit 6c5ca00

File tree

4 files changed

+65
-36
lines changed

4 files changed

+65
-36
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,19 @@ public void setupMessageListener(MessageListener messageListener) {
323323
if (micrometerHolder != null) {
324324
sample = micrometerHolder.start();
325325
}
326-
Observation observation =
327-
RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention,
328-
DefaultRabbitStreamListenerObservationConvention.INSTANCE,
329-
() -> new RabbitStreamMessageReceiverContext(message, getListenerId(), this.streamName),
330-
registry);
326+
Observation observation = null;
327+
if (!registry.isNoop()) {
328+
observation =
329+
RabbitStreamListenerObservation.STREAM_LISTENER_OBSERVATION.observation(this.observationConvention,
330+
DefaultRabbitStreamListenerObservationConvention.INSTANCE,
331+
() -> new RabbitStreamMessageReceiverContext(message, getListenerId(), this.streamName),
332+
registry);
333+
}
334+
331335
Object finalSample = sample;
332336
StreamMessageListener streamListenerToUse = this.streamListener;
333337
if (streamListenerToUse != null) {
334-
observation.observe(() -> {
338+
Runnable listenerCall = () -> {
335339
try {
336340
streamListenerToUse.onStreamMessage(message, context);
337341
if (micrometerHolder != null && finalSample != null) {
@@ -350,13 +354,19 @@ public void setupMessageListener(MessageListener messageListener) {
350354
}
351355
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
352356
}
353-
});
357+
};
358+
if (observation != null) {
359+
observation.observe(listenerCall);
360+
}
361+
else {
362+
listenerCall.run();
363+
}
354364
}
355365
else {
356366
Message message2 = this.streamConverter.toMessage(message, new StreamMessageProperties(context));
357367
if (this.messageListener instanceof ChannelAwareMessageListener channelAwareMessageListener) {
358368
try {
359-
observation.observe(() -> {
369+
Runnable listenerCall = () -> {
360370
try {
361371
channelAwareMessageListener.onMessage(message2, null);
362372
if (micrometerHolder != null && finalSample != null) {
@@ -376,7 +386,13 @@ public void setupMessageListener(MessageListener messageListener) {
376386
}
377387
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
378388
}
379-
});
389+
};
390+
if (observation != null) {
391+
observation.observe(listenerCall);
392+
}
393+
else {
394+
listenerCall.run();
395+
}
380396
}
381397
catch (Exception ex) { // NOSONAR
382398
this.logger.error(ex, "Listener threw an exception");
@@ -385,7 +401,12 @@ public void setupMessageListener(MessageListener messageListener) {
385401
else {
386402
MessageListener messageListenerToUse = this.messageListener;
387403
Assert.state(messageListenerToUse != null, "'messageListener' or 'streamListener' is required");
388-
observation.observe(() -> messageListenerToUse.onMessage(message2));
404+
if (observation != null) {
405+
observation.observe(() -> messageListenerToUse.onMessage(message2));
406+
}
407+
else {
408+
messageListenerToUse.onMessage(message2);
409+
}
389410
}
390411
}
391412
});

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/producer/RabbitStreamTemplate.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application
8383
@SuppressWarnings("NullAway.Init")
8484
private String beanName;
8585

86-
private ProducerCustomizer producerCustomizer = (name, builder) -> { };
86+
private ProducerCustomizer producerCustomizer = (name, builder) -> {
87+
};
8788

8889
private boolean observationEnabled;
8990

@@ -107,7 +108,6 @@ public RabbitStreamTemplate(Environment environment, String streamName) {
107108
this.streamName = streamName;
108109
}
109110

110-
111111
private Producer createOrGetProducer() {
112112
Producer producerToUse = this.producer;
113113
if (producerToUse == null) {
@@ -170,7 +170,6 @@ public void setSuperStreamRouting(Function<com.rabbitmq.stream.Message, String>
170170
}
171171
}
172172

173-
174173
/**
175174
* Set a converter for {@link #convertAndSend(Object)} operations.
176175
* @param messageConverter the converter.
@@ -243,13 +242,11 @@ public MessageConverter messageConverter() {
243242
return this.messageConverter;
244243
}
245244

246-
247245
@Override
248246
public StreamMessageConverter streamMessageConverter() {
249247
return this.streamConverter;
250248
}
251249

252-
253250
@Override
254251
public CompletableFuture<Boolean> send(Message message) {
255252
CompletableFuture<Boolean> future = new CompletableFuture<>();
@@ -271,27 +268,32 @@ public CompletableFuture<Boolean> convertAndSend(Object message, @Nullable Messa
271268
return send(message2);
272269
}
273270

274-
275271
@Override
276272
public CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message) {
277273
CompletableFuture<Boolean> future = new CompletableFuture<>();
278274
observeSend(message, future);
279275
return future;
280276
}
281277

282-
@SuppressWarnings({ "NullAway", "try" }) // Dataflow analysis limitation
278+
@SuppressWarnings({"NullAway", "try"}) // Dataflow analysis limitation
283279
private void observeSend(com.rabbitmq.stream.Message message, CompletableFuture<Boolean> future) {
284-
Observation observation = RabbitStreamTemplateObservation.STREAM_TEMPLATE_OBSERVATION.observation(
285-
this.observationConvention, DefaultRabbitStreamTemplateObservationConvention.INSTANCE,
286-
() -> new RabbitStreamMessageSenderContext(message, this.beanName, this.streamName),
287-
obtainObservationRegistry());
288-
observation.start();
280+
ObservationRegistry registry = obtainObservationRegistry();
281+
Observation observation = null;
282+
if (registry != null && !registry.isNoop()) {
283+
observation = RabbitStreamTemplateObservation.STREAM_TEMPLATE_OBSERVATION.observation(
284+
this.observationConvention, DefaultRabbitStreamTemplateObservationConvention.INSTANCE,
285+
() -> new RabbitStreamMessageSenderContext(message, this.beanName, this.streamName),
286+
registry);
287+
observation.start();
288+
}
289289
try {
290290
createOrGetProducer().send(message, handleConfirm(future, observation));
291291
}
292292
catch (Exception ex) {
293-
observation.error(ex);
294-
observation.stop();
293+
if (observation != null) {
294+
observation.error(ex);
295+
observation.stop();
296+
}
295297
future.completeExceptionally(ex);
296298
}
297299
}
@@ -312,11 +314,13 @@ public MessageBuilder messageBuilder() {
312314
return createOrGetProducer().messageBuilder();
313315
}
314316

315-
private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Observation observation) {
317+
private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, @Nullable Observation observation) {
316318
return confStatus -> {
317319
if (confStatus.isConfirmed()) {
318320
future.complete(true);
319-
observation.stop();
321+
if (observation != null) {
322+
observation.stop();
323+
}
320324
}
321325
else {
322326
int code = confStatus.getCode();
@@ -328,8 +332,10 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
328332
default -> "Unknown code: " + code;
329333
};
330334
StreamSendException ex = new StreamSendException(errorMessage, code);
331-
observation.error(ex);
332-
observation.stop();
335+
if (observation != null) {
336+
observation.error(ex);
337+
observation.stop();
338+
}
333339
future.completeExceptionally(ex);
334340
}
335341
};

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import com.rabbitmq.client.Return;
5151
import com.rabbitmq.client.ShutdownListener;
5252
import com.rabbitmq.client.ShutdownSignalException;
53-
import io.micrometer.observation.Observation;
5453
import io.micrometer.observation.ObservationRegistry;
5554
import org.jspecify.annotations.Nullable;
5655

@@ -2455,17 +2454,20 @@ public void doSend(Channel channel, @Nullable String exchangeArg, @Nullable Stri
24552454
}
24562455

24572456
protected void observeTheSend(Channel channel, Message message, boolean mandatory, String exch, String rKey) {
2458-
24592457
if (!this.observationRegistryObtained && this.observationEnabled) {
24602458
obtainObservationRegistry(this.applicationContext);
24612459
this.observationRegistryObtained = true;
24622460
}
24632461
ObservationRegistry registry = getObservationRegistry();
2464-
Observation observation = RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
2465-
DefaultRabbitTemplateObservationConvention.INSTANCE,
2466-
() -> new RabbitMessageSenderContext(message, this.beanName, exch, rKey), registry);
2467-
2468-
observation.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
2462+
if (registry.isNoop()) {
2463+
sendToRabbit(channel, exch, rKey, mandatory, message);
2464+
}
2465+
else {
2466+
RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention,
2467+
DefaultRabbitTemplateObservationConvention.INSTANCE,
2468+
() -> new RabbitMessageSenderContext(message, this.beanName, exch, rKey), registry)
2469+
.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, message));
2470+
}
24692471
}
24702472

24712473
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ protected void invokeErrorHandler(Throwable ex) {
15101510
protected void executeListener(Channel channel, Object data) {
15111511
Observation observation;
15121512
ObservationRegistry registry = getObservationRegistry();
1513-
if (data instanceof Message message) {
1513+
if (data instanceof Message message && !registry.isNoop()) {
15141514
observation = RabbitListenerObservation.LISTENER_OBSERVATION.observation(this.observationConvention,
15151515
DefaultRabbitListenerObservationConvention.INSTANCE,
15161516
() -> new RabbitMessageReceiverContext(message, getListenerId()), registry);

0 commit comments

Comments
 (0)