@@ -48,6 +48,10 @@ public class KafkaProducerInterceptor {
4848 public static final ThreadLocal <java .util .Deque <Properties >> TL_PROPS_STACK =
4949 ThreadLocal .withInitial (java .util .ArrayDeque ::new );
5050
51+ // ThreadLocal stack to pass original/optimized configuration maps from optimization phase to reporter creation.
52+ public static final ThreadLocal <java .util .Deque <ConfigInfo >> TL_CFG_STACK =
53+ ThreadLocal .withInitial (java .util .ArrayDeque ::new );
54+
5155 // Static initializer to start the shared collector if enabled
5256 static {
5357 if (!DISABLED ) {
@@ -211,6 +215,16 @@ public static void onExit(@Advice.This Object producer) {
211215 producerMetricsMap .put (producerId , metricsInfo );
212216 clientStatsReporters .put (producerId , reporter );
213217
218+ // Pop configuration info from ThreadLocal stack (if any) and attach to reporter
219+ java .util .Deque <ConfigInfo > cfgStack = TL_CFG_STACK .get ();
220+ ConfigInfo cfgInfo = cfgStack .isEmpty ()? null : cfgStack .pop ();
221+ if (cfgStack .isEmpty ()) {
222+ TL_CFG_STACK .remove ();
223+ }
224+ if (cfgInfo != null ) {
225+ reporter .setConfigurations (cfgInfo .originalConfig , cfgInfo .optimizedConfig );
226+ }
227+
214228 logger .debug ("Producer {} registered with shared metrics collector" , producerId );
215229 }
216230 } catch (Exception e ) {
@@ -662,6 +676,89 @@ public boolean collectMetricsForProducer(String producerId, ProducerMetricsInfo
662676 info .updateLastStats (
663677 new CompressionStats (totalOutgoingBytes , prevStats .uncompressedBytes + uncompressedBytes ));
664678
679+ // Extract a snapshot of all producer metrics to include in stats reporting
680+ java .util .Map <String , Double > allMetricsSnapshot = new java .util .HashMap <>();
681+ try {
682+ java .util .Map <?, ?> rawMetricsMap = extractMetricsMap (metrics );
683+ if (rawMetricsMap != null ) {
684+ for (java .util .Map .Entry <?, ?> mEntry : rawMetricsMap .entrySet ()) {
685+ Object mKey = mEntry .getKey ();
686+ String group = null ;
687+ String namePart ;
688+ String keyString = null ;
689+ if (mKey == null ) continue ;
690+
691+ if (mKey .getClass ().getName ().endsWith ("MetricName" )) {
692+ try {
693+ java .lang .reflect .Method nameMethod = findMethod (mKey .getClass (), "name" );
694+ java .lang .reflect .Method groupMethod = findMethod (mKey .getClass (), "group" );
695+ namePart = (nameMethod != null ) ? nameMethod .invoke (mKey ).toString () : mKey .toString ();
696+ group = (groupMethod != null ) ? groupMethod .invoke (mKey ).toString () : "" ;
697+ if (!"producer-metrics" .equals (group )) {
698+ continue ; // skip non-producer groups
699+ }
700+ keyString = namePart ; // store without the producer-metrics prefix
701+ } catch (Exception ignored ) {}
702+ } else if (mKey instanceof String ) {
703+ keyString = mKey .toString ();
704+ if (!keyString .startsWith ("producer-metrics" )) {
705+ continue ; // skip
706+ }
707+ // strip the prefix (and the following dot if present)
708+ if (keyString .startsWith ("producer-metrics." )) {
709+ keyString = keyString .substring ("producer-metrics." .length ());
710+ } else if ("producer-metrics" .equals (keyString )) {
711+ continue ; // unlikely but skip bare prefix
712+ }
713+ }
714+ if (keyString == null ) continue ;
715+ double mVal = extractMetricValue (mEntry .getValue ());
716+ if (!Double .isNaN (mVal )) {
717+ allMetricsSnapshot .put (keyString , mVal );
718+ }
719+ }
720+ }
721+ } catch (Exception snapshotEx ) {
722+ // ignore snapshot errors
723+ }
724+
725+ // Update reporter with latest metrics snapshot
726+ reporter .updateProducerMetrics (allMetricsSnapshot );
727+
728+ // Aggregate topics written by this producer from producer-topic-metrics
729+ java .util .Set <String > newTopics = new java .util .HashSet <>();
730+ try {
731+ java .util .Map <?,?> rawMapForTopics = extractMetricsMap (metrics );
732+ if (rawMapForTopics != null ) {
733+ for (java .util .Map .Entry <?,?> me : rawMapForTopics .entrySet ()) {
734+ Object k = me .getKey ();
735+ if (k == null ) continue ;
736+ if (k .getClass ().getName ().endsWith ("MetricName" )) {
737+ try {
738+ java .lang .reflect .Method groupMethod = findMethod (k .getClass (), "group" );
739+ java .lang .reflect .Method tagsMethod = findMethod (k .getClass (), "tags" );
740+ if (groupMethod != null && tagsMethod != null ) {
741+ groupMethod .setAccessible (true );
742+ String g = groupMethod .invoke (k ).toString ();
743+ if ("producer-topic-metrics" .equals (g )) {
744+ tagsMethod .setAccessible (true );
745+ Object tagObj = tagsMethod .invoke (k );
746+ if (tagObj instanceof java .util .Map ) {
747+ Object topicObj = ((java .util .Map <?,?>)tagObj ).get ("topic" );
748+ if (topicObj != null ) newTopics .add (topicObj .toString ());
749+ }
750+ }
751+ }
752+ } catch (Exception ignore ) {}
753+ }
754+ }
755+ }
756+ } catch (Exception ignore ) {}
757+
758+ if (!newTopics .isEmpty ()) {
759+ reporter .addTopics (newTopics );
760+ }
761+
665762 // Report the compression statistics for this interval (delta)
666763 reporter .recordBatch (uncompressedBytes , compressedBytes );
667764
@@ -704,11 +801,11 @@ public double getCompressionRatio(Object metrics) {
704801 * Find direct compression metrics in the metrics map.
705802 */
706803 private double findDirectCompressionMetric (Map <?, ?> metricsMap ) {
707- // Look for compression metrics directly in the map
804+ // Look for compression metrics in the *producer-metrics* group only
708805 for (Map .Entry <?, ?> entry : metricsMap .entrySet ()) {
709806 Object key = entry .getKey ();
710807
711- // Handle keys that are MetricName objects
808+ // Handle MetricName keys
712809 if (key .getClass ().getName ().endsWith ("MetricName" )) {
713810 try {
714811 Method nameMethod = findMethod (key .getClass (), "name" );
@@ -721,33 +818,28 @@ private double findDirectCompressionMetric(Map<?, ?> metricsMap) {
721818 String name = nameMethod .invoke (key ).toString ();
722819 String group = groupMethod .invoke (key ).toString ();
723820
724- // Check for common compression metrics
725- if ((group .equals ("producer-metrics" ) || group .equals ("producer-topic-metrics" )) &&
726- (name .equals ("compression-rate-avg" ) || name .equals ("record-compression-rate" ) ||
727- name .equals ("compression-ratio" ))) {
821+ // Only accept metrics from producer-metrics group
822+ if (group .equals ("producer-metrics" ) &&
823+ (name .equals ("compression-rate-avg" ) || name .equals ("compression-ratio" ))) {
728824
729- logger .debug ("Found compression metric: {}.{}" , group , name );
730825 double value = extractMetricValue (entry .getValue ());
731826 if (value > 0 ) {
732- logger .debug ("Compression ratio value : {}" , value );
827+ logger .debug ("Found producer-metrics compression metric : {} -> {}" , name , value );
733828 return value ;
734829 }
735830 }
736831 }
737- } catch (Exception e ) {
738- // Ignore and continue checking other keys
832+ } catch (Exception ignored ) {
739833 }
740834 }
741835 // Handle String keys
742836 else if (key instanceof String ) {
743837 String keyStr = (String ) key ;
744- if ((keyStr .contains ("producer-metrics" ) || keyStr .contains ("producer-topic-metrics" )) &&
745- (keyStr .contains ("compression-rate" ) || keyStr .contains ("compression-ratio" ))) {
746-
747- logger .debug ("Found compression metric with string key: {}" , keyStr );
838+ if (keyStr .startsWith ("producer-metrics" ) &&
839+ (keyStr .contains ("compression-rate-avg" ) || keyStr .contains ("compression-ratio" ))) {
748840 double value = extractMetricValue (entry .getValue ());
749841 if (value > 0 ) {
750- logger .debug ("Compression ratio value : {}" , value );
842+ logger .debug ("Found producer-metrics compression metric (string key) : {} -> {}" , keyStr , value );
751843 return value ;
752844 }
753845 }
@@ -757,101 +849,57 @@ else if (key instanceof String) {
757849 }
758850
759851 /**
760- * Get the total outgoing bytes across all nodes from the metrics object .
761- * This metric exists per broker node and represents bytes after compression .
852+ * Get the total outgoing bytes for the *producer* (after compression) .
853+ * Uses producer-metrics group only to keep numbers per-producer rather than per-node .
762854 */
763855 private long getOutgoingBytesTotal (Object metrics ) {
764856 try {
765- // Extract the metrics map from the Metrics object
766857 Map <?, ?> metricsMap = extractMetricsMap (metrics );
767858 if (metricsMap != null ) {
768- // The outgoing-byte-total is in the producer-node-metrics group
769- String targetGroup = "producer-node-metrics" ;
770- String targetMetric = "outgoing-byte-total" ;
771- long totalBytes = 0 ;
772- boolean foundAnyNodeMetric = false ;
859+ String targetGroup = "producer-metrics" ;
860+ String [] candidateNames = {"outgoing-byte-total" , "byte-total" };
773861
774- // Iterate through all metrics
775862 for (Map .Entry <?, ?> entry : metricsMap .entrySet ()) {
776863 Object key = entry .getKey ();
777864
778- // Handle MetricName objects
865+ // MetricName keys
779866 if (key .getClass ().getName ().endsWith ("MetricName" )) {
780867 try {
781868 Method nameMethod = findMethod (key .getClass (), "name" );
782869 Method groupMethod = findMethod (key .getClass (), "group" );
783- Method tagsMethod = findMethod (key .getClass (), "tags" );
784-
785870 if (nameMethod != null && groupMethod != null ) {
786871 nameMethod .setAccessible (true );
787872 groupMethod .setAccessible (true );
788-
789873 String name = nameMethod .invoke (key ).toString ();
790874 String group = groupMethod .invoke (key ).toString ();
791875
792- // If this is a node metric with outgoing bytes
793- if (group .equals (targetGroup ) && name .equals (targetMetric )) {
794- foundAnyNodeMetric = true ;
795-
796- double value = extractMetricValue (entry .getValue ());
797-
798- // Get the node-id from tags if possible
799- String nodeId = "unknown" ;
800- if (tagsMethod != null ) {
801- tagsMethod .setAccessible (true );
802- Object tags = tagsMethod .invoke (key );
803- if (tags instanceof Map ) {
804- Object nodeIdObj = ((Map <?, ?>) tags ).get ("node-id" );
805- if (nodeIdObj != null ) {
806- nodeId = nodeIdObj .toString ();
876+ if (group .equals (targetGroup )) {
877+ for (String n : candidateNames ) {
878+ if (n .equals (name )) {
879+ double val = extractMetricValue (entry .getValue ());
880+ if (val > 0 ) {
881+ logger .debug ("Found producer-metrics {} = {}" , name , val );
882+ return (long ) val ;
807883 }
808884 }
809885 }
810-
811- logger .debug ("Found outgoing bytes for node {}: {}" , nodeId , value );
812- totalBytes += (long ) value ;
813- }
814-
815- // Fall back to producer-metrics.byte-total if needed
816- if (!foundAnyNodeMetric && group .equals ("producer-metrics" ) &&
817- (name .equals ("byte-total" ) || name .equals ("outgoing-byte-total" ))) {
818- double value = extractMetricValue (entry .getValue ());
819- logger .debug ("Found fallback byte metric: {}={}" , name , value );
820- // Save this value but keep looking for node-specific metrics
821- if (totalBytes == 0 ) {
822- totalBytes = (long ) value ;
823- }
824886 }
825887 }
826- } catch (Exception e ) {
827- logger .debug ("Error extracting metrics: {}" , e .getMessage ());
828- }
829- }
830- // Handle String keys
831- else if (key instanceof String ) {
888+ } catch (Exception ignored ) {}
889+ } else if (key instanceof String ) {
832890 String keyStr = (String ) key ;
833- if (keyStr .contains (targetGroup ) && keyStr .contains (targetMetric )) {
834- foundAnyNodeMetric = true ;
835- double value = extractMetricValue (entry .getValue ());
836- logger .debug ("Found outgoing bytes with string key {}: {}" , keyStr , value );
837- totalBytes += (long ) value ;
891+ if (keyStr .startsWith (targetGroup ) && (keyStr .contains ("outgoing-byte-total" ) || keyStr .contains ("byte-total" ))) {
892+ double val = extractMetricValue (entry .getValue ());
893+ if (val > 0 ) {
894+ logger .debug ("Found producer-metrics byte counter (string key) {} = {}" , keyStr , val );
895+ return (long ) val ;
896+ }
838897 }
839898 }
840899 }
841-
842- if (totalBytes > 0 ) {
843- if (foundAnyNodeMetric ) {
844- logger .debug ("Total outgoing bytes across all nodes: {}" , totalBytes );
845- } else {
846- logger .debug ("Using fallback byte metric total: {}" , totalBytes );
847- }
848- return totalBytes ;
849- }
850- } else {
851- logger .debug ("Could not extract metrics map from: {}" , metrics .getClass ().getName ());
852900 }
853901 } catch (Exception e ) {
854- logger .debug ("Error getting outgoing bytes total: {}" , e .getMessage (), e );
902+ logger .debug ("Error getting outgoing bytes total from producer-metrics : {}" , e .getMessage ());
855903 }
856904
857905 return 0 ;
@@ -942,4 +990,17 @@ public CompressionStats(long compressedBytes, long uncompressedBytes) {
942990 this .uncompressedBytes = uncompressedBytes ;
943991 }
944992 }
993+
994+ /**
995+ * Holder for original and optimized configuration maps passed between optimization
996+ * phase and stats reporter creation using ThreadLocal.
997+ */
998+ public static class ConfigInfo {
999+ public final java .util .Map <String ,Object > originalConfig ;
1000+ public final java .util .Map <String ,Object > optimizedConfig ;
1001+ public ConfigInfo (java .util .Map <String ,Object > orig , java .util .Map <String ,Object > opt ) {
1002+ this .originalConfig = orig ;
1003+ this .optimizedConfig = opt ;
1004+ }
1005+ }
9451006}
0 commit comments