2626import java .util .Map .Entry ;
2727import java .util .Properties ;
2828import java .util .Set ;
29+ import java .util .concurrent .ConcurrentHashMap ;
30+ import java .util .concurrent .ConcurrentMap ;
2931import java .util .concurrent .CopyOnWriteArrayList ;
3032import java .util .concurrent .Executor ;
3133import java .util .stream .Collectors ;
8082import org .springframework .transaction .interceptor .TransactionAttribute ;
8183import org .springframework .transaction .support .TransactionSynchronizationManager ;
8284import org .springframework .util .Assert ;
85+ import org .springframework .util .ClassUtils ;
8386import org .springframework .util .ErrorHandler ;
8487import org .springframework .util .StringUtils ;
8588import org .springframework .util .backoff .BackOff ;
8689import org .springframework .util .backoff .FixedBackOff ;
8790
8891import com .rabbitmq .client .Channel ;
8992import com .rabbitmq .client .ShutdownSignalException ;
93+ import io .micrometer .core .instrument .MeterRegistry ;
94+ import io .micrometer .core .instrument .Timer ;
95+ import io .micrometer .core .instrument .Timer .Builder ;
96+ import io .micrometer .core .instrument .Timer .Sample ;
9097
9198/**
9299 * @author Mark Pollack
@@ -119,12 +126,17 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
119126
120127 public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000 ;
121128
129+ private static final boolean MICROMETER_PRESENT = ClassUtils .isPresent (
130+ "io.micrometer.core.instrument.MeterRegistry" , AbstractMessageListenerContainer .class .getClassLoader ());
131+
122132 private final ContainerDelegate delegate = this ::actualInvokeListener ;
123133
124134 protected final Object consumersMonitor = new Object (); //NOSONAR
125135
126136 private final Map <String , Object > consumerArgs = new HashMap <>();
127137
138+ private final Map <String , String > micrometerTags = new HashMap <>();
139+
128140 private ContainerDelegate proxy = this .delegate ;
129141
130142 private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT ;
@@ -224,6 +236,10 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
224236
225237 private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy (0 , 0 , 0L );
226238
239+ private MicrometerHolder micrometerHolder ;
240+
241+ private boolean micrometerEnabled = true ;
242+
227243 private volatile boolean lazyLoad ;
228244
229245 @ Override
@@ -631,12 +647,16 @@ public void setForceCloseChannel(boolean forceCloseChannel) {
631647 @ Nullable
632648 protected String getRoutingLookupKey () {
633649 return super .getConnectionFactory () instanceof RoutingConnectionFactory
634- ? this .lookupKeyQualifier + "[" + this .queues .stream ()
635- .map (Queue ::getName )
636- .collect (Collectors .joining ("," )) + "]"
650+ ? this .lookupKeyQualifier + queuesAsListString ()
637651 : null ;
638652 }
639653
654+ private String queuesAsListString () {
655+ return "[" + this .queues .stream ()
656+ .map (Queue ::getName )
657+ .collect (Collectors .joining ("," )) + "]" ;
658+ }
659+
640660 /**
641661 * Return the (@link RoutingConnectionFactory} if the connection factory is a
642662 * {@link RoutingConnectionFactory}; null otherwise.
@@ -1075,6 +1095,26 @@ protected Collection<MessagePostProcessor> getAfterReceivePostProcessors() {
10751095 return this .afterReceivePostProcessors ;
10761096 }
10771097
1098+ /**
1099+ * Set additional tags for the Micrometer listener timers.
1100+ * @param tags the tags.
1101+ * @since 2.2
1102+ */
1103+ public void setMicrometerTags (Map <String , String > tags ) {
1104+ if (tags != null ) {
1105+ this .micrometerTags .putAll (tags );
1106+ }
1107+ }
1108+
1109+ /**
1110+ * Set to false to disable micrometer listener timers.
1111+ * @param micrometerEnabled false to disable.
1112+ * @since 2.2
1113+ */
1114+ public void setMicrometerEnabled (boolean micrometerEnabled ) {
1115+ this .micrometerEnabled = micrometerEnabled ;
1116+ }
1117+
10781118 /**
10791119 * Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
10801120 */
@@ -1093,6 +1133,16 @@ public final void afterPropertiesSet() {
10931133 "channelTransacted=false" );
10941134 validateConfiguration ();
10951135 initialize ();
1136+ try {
1137+ if (this .micrometerHolder == null && MICROMETER_PRESENT && this .micrometerEnabled
1138+ && this .applicationContext != null ) {
1139+ this .micrometerHolder = new MicrometerHolder (this .applicationContext , getListenerId (),
1140+ this .micrometerTags );
1141+ }
1142+ }
1143+ catch (IllegalStateException e ) {
1144+ this .logger .debug ("Could not enable micrometer timers" , e );
1145+ }
10961146 }
10971147
10981148 @ Override
@@ -1128,6 +1178,9 @@ protected void initializeProxy(Object delegate) {
11281178 @ Override
11291179 public void destroy () {
11301180 shutdown ();
1181+ if (this .micrometerHolder != null ) {
1182+ this .micrometerHolder .destroy ();
1183+ }
11311184 }
11321185
11331186 // -------------------------------------------------------------------------
@@ -1350,10 +1403,24 @@ protected void executeListener(Channel channel, Object data) {
13501403 }
13511404 throw new MessageRejectedWhileStoppingException ();
13521405 }
1406+ Object sample = null ;
1407+ if (this .micrometerHolder != null ) {
1408+ sample = this .micrometerHolder .start ();
1409+ }
13531410 try {
13541411 doExecuteListener (channel , data );
1412+ if (sample != null ) {
1413+ this .micrometerHolder .success (sample , data instanceof Message
1414+ ? ((Message ) data ).getMessageProperties ().getConsumerQueue ()
1415+ : queuesAsListString ());
1416+ }
13551417 }
13561418 catch (RuntimeException ex ) {
1419+ if (sample != null ) {
1420+ this .micrometerHolder .failure (sample , data instanceof Message
1421+ ? ((Message ) data ).getMessageProperties ().getConsumerQueue ()
1422+ : queuesAsListString (), ex .getClass ().getSimpleName ());
1423+ }
13571424 Message message ;
13581425 if (data instanceof Message ) {
13591426 message = (Message ) data ;
@@ -1908,4 +1975,72 @@ else if (!RabbitUtils.isNormalChannelClose(cause)) {
19081975
19091976 }
19101977
1978+ private static final class MicrometerHolder {
1979+
1980+ private final ConcurrentMap <String , Timer > timers = new ConcurrentHashMap <>();
1981+
1982+ private final MeterRegistry registry ;
1983+
1984+ private final Map <String , String > tags ;
1985+
1986+ private final String listenerId ;
1987+
1988+ MicrometerHolder (@ Nullable ApplicationContext context , String listenerId , Map <String , String > tags ) {
1989+ if (context == null ) {
1990+ throw new IllegalStateException ("No micrometer registry present" );
1991+ }
1992+ Map <String , MeterRegistry > registries = context .getBeansOfType (MeterRegistry .class , false , false );
1993+ if (registries .size () == 1 ) {
1994+ this .registry = registries .values ().iterator ().next ();
1995+ this .listenerId = listenerId ;
1996+ this .tags = tags ;
1997+ }
1998+ else {
1999+ throw new IllegalStateException ("No micrometer registry present" );
2000+ }
2001+ }
2002+
2003+ Object start () {
2004+ return Timer .start (this .registry );
2005+ }
2006+
2007+ void success (Object sample , String queue ) {
2008+ Timer timer = this .timers .get (queue + "none" );
2009+ if (timer == null ) {
2010+ timer = buildTimer (this .listenerId , "success" , queue , "none" );
2011+ }
2012+ ((Sample ) sample ).stop (timer );
2013+ }
2014+
2015+ void failure (Object sample , String queue , String exception ) {
2016+ Timer timer = this .timers .get (queue + exception );
2017+ if (timer == null ) {
2018+ timer = buildTimer (this .listenerId , "failure" , queue , exception );
2019+ }
2020+ ((Sample ) sample ).stop (timer );
2021+ }
2022+
2023+ private Timer buildTimer (String aListenerId , String result , String queue , String exception ) {
2024+
2025+ Builder builder = Timer .builder ("spring.rabbitmq.listener" )
2026+ .description ("Spring RabbitMQ Listener" )
2027+ .tag ("listener.id" , aListenerId )
2028+ .tag ("queue" , queue )
2029+ .tag ("result" , result )
2030+ .tag ("exception" , exception );
2031+ if (this .tags != null && !this .tags .isEmpty ()) {
2032+ this .tags .forEach ((key , value ) -> builder .tag (key , value ));
2033+ }
2034+ Timer registeredTimer = builder .register (this .registry );
2035+ this .timers .put (queue + exception , registeredTimer );
2036+ return registeredTimer ;
2037+ }
2038+
2039+ void destroy () {
2040+ this .timers .values ().forEach (this .registry ::remove );
2041+ this .timers .clear ();
2042+ }
2043+
2044+ }
2045+
19112046}
0 commit comments