2727
2828package com .techempower .gemini .cluster .jms ;
2929
30+ import java .util .*;
3031import javax .jms .*;
31-
3232import org .slf4j .*;
33-
3433import com .techempower .cache .*;
3534import com .techempower .collection .relation .*;
3635import com .techempower .data .*;
@@ -51,6 +50,8 @@ public class CacheMessageManager
5150{
5251 public static final String CACHE_TOPIC_DESTINATION = "CACHE.TOPIC" ;
5352 public static final String MESSAGE_PROPERTY_UUID = "Gemini.CacheMgr.ClientUUID" ;
53+ public static final long DEFAULT_STATS_PERIOD_MINUTES = 10 ;
54+ public static final long DEFAULT_STATS_LOG_MAX_THRESHOLD_MS = 10 ;
5455
5556 //
5657 // Variables.
@@ -68,6 +69,8 @@ public class CacheMessageManager
6869 private String instanceID ;
6970 private int maximumRelationSize = 10000 ;
7071 private int deliveryMode = DeliveryMode .PERSISTENT ;
72+ private long statsPeriodMinutes = DEFAULT_STATS_PERIOD_MINUTES ;
73+ private long statsLogMaxThresholdMs = DEFAULT_STATS_LOG_MAX_THRESHOLD_MS ;
7174
7275 //
7376 // Methods.
@@ -92,7 +95,13 @@ public void configure(EnhancedProperties props)
9295 String propsPrefix = "CacheMessageManager." ;
9396 this .maximumRelationSize = props .getInt (
9497 propsPrefix + "MaximumRelationSize" , this .maximumRelationSize );
98+ log .info ("[CacheMessageManager.MaximumRelationSize: " + maximumRelationSize + "]" );
9599 this .deliveryMode = props .getInt (propsPrefix + "DeliveryMode" , this .deliveryMode );
100+ log .info ("[CacheMessageManager.DeliveryMode: " + deliveryMode + "]" );
101+ this .statsPeriodMinutes = props .getLong ("StatsPeriodMinutes" , DEFAULT_STATS_PERIOD_MINUTES );
102+ log .info ("[CacheMessageManager.StatsPeriodMinutes: " + statsPeriodMinutes + "]" );
103+ this .statsLogMaxThresholdMs = props .getLong ("StatsLogMaxThresholdMs" , DEFAULT_STATS_LOG_MAX_THRESHOLD_MS );
104+ log .info ("[CacheMessageManager.StatsLogMaxThresholdMs: " + statsLogMaxThresholdMs + "]" );
96105 }
97106
98107 /**
@@ -383,6 +392,17 @@ public void reset(long relationID)
383392 private class CacheSignalListener
384393 implements MessageListener
385394 {
395+ private long statsCollectionStart = System .currentTimeMillis ();
396+ // No need for concurrent data structures since a listener will only get
397+ // one message at a time.
398+ private Map <String , Long > statsCount = new HashMap <>();
399+ private Map <String , Long > statsTxMin = new HashMap <>();
400+ private Map <String , Long > statsTxMax = new HashMap <>();
401+ private Map <String , Long > statsTxSum = new HashMap <>();
402+ private Map <String , Long > statsPxMin = new HashMap <>();
403+ private Map <String , Long > statsPxMax = new HashMap <>();
404+ private Map <String , Long > statsPxSum = new HashMap <>();
405+ private long statsCollectionMs = 0L ;
386406
387407 public CacheSignalListener (GeminiApplication application )
388408 {
@@ -399,6 +419,8 @@ public void onMessage(javax.jms.Message message)
399419 log .debug ("EntityStore is not yet initialized. Ignoring message." );
400420 return ;
401421 }
422+ long start = System .currentTimeMillis ();
423+ String statsKey = null ;
402424 BroadcastMessage broadcastMessage = null ;
403425 // cast object to BroadcastMessage
404426 if (message instanceof ObjectMessage )
@@ -441,6 +463,8 @@ else if (senderUuid.equals(instanceID))
441463 if (broadcastMessage instanceof CacheMessage )
442464 {
443465 final CacheMessage cacheMessage = (CacheMessage )broadcastMessage ;
466+ statsKey = "g" + cacheMessage .getGroupId ();
467+
444468 // handle the message
445469 switch (cacheMessage .getAction ())
446470 {
@@ -540,6 +564,7 @@ else if (broadcastMessage instanceof CachedRelationMessage)
540564 {
541565 final CachedRelationMessage cachedRelationMessage = (CachedRelationMessage )broadcastMessage ;
542566 final CachedRelation <?, ?> relation = store .getCachedRelation (cachedRelationMessage .getRelationId ());
567+ statsKey = "r" + cachedRelationMessage .getRelationId ();
543568 switch (cachedRelationMessage .getAction ())
544569 {
545570 case (CachedRelationMessage .ACTION_ADD ):
@@ -610,6 +635,163 @@ else if (broadcastMessage instanceof CachedRelationMessage)
610635 }
611636 }
612637 }
638+
639+ if (statsKey != null ) {
640+ // Gather statistics on transmission and receiver processing timings and periodically log a
641+ // summary.
642+ try {
643+ // Track how long it takes to gather the stats.
644+ long startStats = System .currentTimeMillis ();
645+
646+ // Processing time: How long it took to take action on the message.
647+ long pxTime = System .currentTimeMillis () - start ;
648+
649+ // Transmission time: How long it took to receive the message.
650+ long txTime = start - message .getJMSTimestamp ();
651+
652+ // Count: How many messages have been received during the reporting period.
653+ statsCount .merge (statsKey , 1L , Long ::sum );
654+
655+ // Transmission sum: Sum of transmission times during the reporting period.
656+ statsTxSum .merge (statsKey , txTime , Long ::sum );
657+
658+ // Transmission max: Longest recorded transmission time during the reporting period.
659+ long txMax = statsTxMax .getOrDefault (statsKey , -1L );
660+ if (txTime > txMax ) {
661+ statsTxMax .put (statsKey , txTime );
662+ // Log the new max to help with investigating performance problems.
663+ if (txTime > statsLogMaxThresholdMs ) {
664+ // To reduce logging noise, only log new max values that exceed the configured
665+ // threshold.
666+ log .info ("Stats for " + statsKey + ": New transmission max of " + txTime + ": " + message );
667+ }
668+ }
669+
670+ // Transmission min: Shortest recorded transmission time during the reporting period.
671+ long txMin = statsTxMin .getOrDefault (statsKey , -1L );
672+ if (txMin == -1L || txTime < txMin ) {
673+ statsTxMin .put (statsKey , txTime );
674+ }
675+
676+ // Processing sum: Sum of processing times during the reporting period.
677+ statsPxSum .merge (statsKey , pxTime , Long ::sum );
678+
679+ // Processing max: Longest recorded processing time during the reporting period.
680+ long pxMax = statsPxMax .getOrDefault (statsKey , -1L );
681+ if (pxTime > pxMax ) {
682+ statsPxMax .put (statsKey , pxTime );
683+ // Log the new max to help with investigating performance problems.
684+ if (pxTime > statsLogMaxThresholdMs ) {
685+ // To reduce logging noise, only log new max values that exceed the configured
686+ // threshold.
687+ log .info ("Stats for " + statsKey + ": New receiver processing max of " + pxTime + ": " + message );
688+ }
689+ }
690+
691+ // Processing min: Shortest recorded processing time during the reporting period.
692+ long pxMin = statsPxMin .getOrDefault (statsKey , -1L );
693+ if (pxMin == -1L || pxTime < pxMin ) {
694+ statsPxMin .put (statsKey , pxTime );
695+ }
696+
697+ // Record time spent gathering stats.
698+ statsCollectionMs += System .currentTimeMillis () - startStats ;
699+
700+ // Periodically log collected stats and reset accumulators.
701+ if ((System .currentTimeMillis () - statsCollectionStart ) >
702+ (statsPeriodMinutes * UtilityConstants .MINUTE )) {
703+
704+ // Log stats for each CacheGroup/CachedRelation that was received during the last period.
705+ for (String key : statsCount .keySet ()) {
706+ // Count: How many messages have been received during the reporting period.
707+ long keyCount = statsCount .getOrDefault (key , -1L );
708+
709+ // Transmission sum: Sum of transmission times during the reporting period.
710+ long keyTxSum = statsTxSum .getOrDefault (key , -1L );
711+
712+ // Transmission avg: Average of transmission times during the reporting period.
713+ long keyTxAvg = keyCount > 0 ? keyTxSum / keyCount : 0 ;
714+
715+ // Transmission max: Longest recorded transmission time during the reporting period.
716+ long keyTxMax = statsTxMax .getOrDefault (key , -1L );
717+
718+ // Transmission min: Shortest recorded transmission time during the reporting period.
719+ long keyTxMin = statsTxMin .getOrDefault (key , -1L );;
720+
721+ // Processing sum: Sum of processing times during the reporting period.
722+ long keyPxSum = statsPxSum .getOrDefault (key , -1L );;
723+
724+ // Processing avg: Average of processing times during the reporting period.
725+ long keyPxAvg = keyCount > 0 ? keyPxSum / keyCount : 0 ;
726+
727+ // Processing max: Longest recorded processing time during the reporting period.
728+ long keyPxMax = statsPxMax .getOrDefault (key , -1L );;
729+
730+ // Processing min: Shortest recorded processing time during the reporting period.
731+ long keyPxMin = statsPxMin .getOrDefault (key , -1L );
732+
733+ // Log summary of collected statistics.
734+ log .info ("Stats summary for " + key + ": count " + keyCount
735+ + " transmission (max/min/avg) " + keyTxMax
736+ + " " + keyTxMin
737+ + " " + keyTxAvg
738+ + " receiver processing (max/min/avg) " + keyPxMax
739+ + " " + keyPxMin
740+ + " " + keyPxAvg );
741+ }
742+
743+ // Count: How many messages have been received since last reset.
744+ long overallCount = statsCount .values ().stream ().mapToLong (Long ::longValue ).sum ();
745+
746+ // Transmission sum: Sum of transmission times during the reporting period.
747+ long overallTxSum = statsTxSum .values ().stream ().mapToLong (Long ::longValue ).sum ();
748+
749+ // Transmission avg: Average of transmission times during the reporting period.
750+ long overallTxAvg = overallCount > 0 ? overallTxSum / overallCount : 0 ;
751+
752+ // Transmission max: Longest recorded transmission time during the reporting period.
753+ long overallTxMax = statsTxMax .values ().stream ().mapToLong (Long ::longValue ).max ().orElse (-1 );
754+
755+ // Transmission min: Shortest recorded transmission time during the reporting period.
756+ long overallTxMin = statsTxMin .values ().stream ().mapToLong (Long ::longValue ).min ().orElse (-1 );
757+
758+ // Processing sum: Sum of processing times during the reporting period.
759+ long overallPxSum = statsPxSum .values ().stream ().mapToLong (Long ::longValue ).sum ();
760+
761+ // Processing avg: Average of processing times during the reporting period.
762+ long overallPxAvg = overallCount > 0 ? overallPxSum / overallCount : 0 ;
763+
764+ // Processing max: Longest recorded processing time during the reporting period.
765+ long overallPxMax = statsPxMax .values ().stream ().mapToLong (Long ::longValue ).max ().orElse (-1 );
766+
767+ // Processing min: Shortest recorded processing time during the reporting period.
768+ long overallPxMin = statsPxMin .values ().stream ().mapToLong (Long ::longValue ).min ().orElse (-1 );
769+
770+ // Log summary of collected statistics.
771+ log .info ("Stats summary overall: count " + overallCount
772+ + " transmission (max/min/avg) " + overallTxMax
773+ + " " + overallTxMin
774+ + " " + overallTxAvg
775+ + " receiver processing (max/min/avg) " + overallPxMax
776+ + " " + overallPxMin
777+ + " " + overallPxAvg
778+ + " stats collection time: " + statsCollectionMs );
779+
780+ // Reset all accumulators.
781+ statsCount .clear ();
782+ statsTxMin .clear ();
783+ statsTxMax .clear ();
784+ statsTxSum .clear ();
785+ statsPxMin .clear ();
786+ statsPxMax .clear ();
787+ statsPxSum .clear ();
788+ statsCollectionMs = 0L ;
789+ statsCollectionStart = System .currentTimeMillis ();
790+ }
791+ } catch (Exception e ) {
792+ log .error ("Unable to gather statistics" , e );
793+ }
794+ }
613795 }
614796 }
615797}
0 commit comments