99import java .lang .reflect .Method ;
1010import java .util .Properties ;
1111import java .util .Map ;
12- import java .util .List ;
1312import java .util .concurrent .ConcurrentHashMap ;
1413import java .util .concurrent .ScheduledExecutorService ;
1514import java .util .concurrent .Executors ;
@@ -48,6 +47,13 @@ public class KafkaProducerInterceptor {
4847 public static final ThreadLocal <java .util .Deque <Properties >> TL_PROPS_STACK =
4948 ThreadLocal .withInitial (java .util .ArrayDeque ::new );
5049
50+ // ThreadLocal stack to hold the producer UUIDs generated in onEnter so that the same
51+ // value can be reused later in onExit (for stats reporting) and by SuperstreamManager
52+ // when it reports the client information. The stack is aligned with the TL_PROPS_STACK
53+ // (push in onEnter, pop in onExit).
54+ public static final ThreadLocal <java .util .Deque <String >> TL_UUID_STACK =
55+ ThreadLocal .withInitial (java .util .ArrayDeque ::new );
56+
5157 // ThreadLocal stack to pass original/optimized configuration maps from optimization phase to reporter creation.
5258 public static final ThreadLocal <java .util .Deque <ConfigInfo >> TL_CFG_STACK =
5359 ThreadLocal .withInitial (java .util .ArrayDeque ::new );
@@ -57,12 +63,11 @@ public class KafkaProducerInterceptor {
5763 if (!DISABLED ) {
5864 try {
5965 sharedCollector .start ();
60- logger .info ("Superstream metrics collector initialized and started successfully" );
6166 } catch (Exception e ) {
62- logger .error ("Failed to start metrics collector: " + e .getMessage (), e );
67+ logger .error ("[ERR-001] Failed to start metrics collector: {}" , e .getMessage (), e );
6368 }
6469 } else {
65- logger .info ("Superstream metrics collection is disabled via SUPERSTREAM_DISABLED environment variable" );
70+ logger .warn ("Superstream is disabled via SUPERSTREAM_DISABLED environment variable" );
6671 }
6772 }
6873
@@ -109,8 +114,12 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
109114 }
110115
111116 TL_PROPS_STACK .get ().push (properties );
117+
118+ // Generate a UUID for this upcoming producer instance and push onto UUID stack
119+ String producerUuid = java .util .UUID .randomUUID ().toString ();
120+ TL_UUID_STACK .get ().push (producerUuid );
112121 } else {
113- logger .error ("Could not extract properties from producer arguments" );
122+ logger .error ("[ERR-002] Could not extract properties from producer arguments" );
114123 return ;
115124 }
116125
@@ -126,7 +135,7 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
126135 }
127136
128137 if (properties == null || properties .isEmpty ()) {
129- logger .error ("Could not extract properties from properties" );
138+ logger .error ("[ERR-003] Could not extract properties from properties" );
130139 return ;
131140 }
132141
@@ -137,36 +146,21 @@ public static void onEnter(@Advice.AllArguments Object[] args) {
137146 return ;
138147 }
139148
140- logger .info ("Intercepted KafkaProducer constructor" );
141-
142149 // Extract bootstrap servers
143150 String bootstrapServers = properties .getProperty ("bootstrap.servers" );
144151 if (bootstrapServers == null || bootstrapServers .trim ().isEmpty ()) {
145- logger .warn ( " bootstrap.servers is not set, cannot optimize" );
152+ logger .error ( "[ERR-004] bootstrap.servers is not set, cannot optimize" );
146153 return ;
147154 }
148155
149156 // Optimize the producer
150157 boolean success = SuperstreamManager .getInstance ().optimizeProducer (bootstrapServers , clientId , properties );
151158 if (!success ) {
152- /*
153- * Roll-back any changes we may have applied without wiping the whole
154- * Properties object first. Clearing the map created a tiny window in
155- * which application-code that already held the reference could observe
156- * an empty map – leading to NPEs further down the stack. Instead we
157- * simply overlay the original values which is functionally equivalent
158- * but keeps the map non-empty at all times.
159- */
160159 properties .putAll (originalProperties );
161160 }
162161 } catch (Exception e ) {
163- /*
164- * If optimisation threw, restore the original entries but keep the map
165- * non-empty so that any other thread holding a reference never sees an
166- * empty state.
167- */
168162 properties .putAll (originalProperties );
169- logger .error ("Error during producer optimization, restored original properties" , e );
163+ logger .error ("[ERR-005] Error during producer optimization, restored original properties: {}" , e . getMessage () , e );
170164 }
171165 }
172166
@@ -192,20 +186,30 @@ public static void onExit(@Advice.This Object producer) {
192186
193187 java .util .Deque <Properties > stack = TL_PROPS_STACK .get ();
194188 if (stack .isEmpty ()) {
195- logger .error ("No captured properties for this producer constructor; skipping stats reporter setup" );
189+ logger .error ("[ERR-006] No captured properties for this producer constructor; skipping stats reporter setup" );
196190 return ;
197191 }
198192
199193 Properties producerProps = stack .pop ();
200194
195+ // Retrieve matching UUID for this constructor instance
196+ java .util .Deque <String > uuidStack = TL_UUID_STACK .get ();
197+ String producerUuid = "" ;
198+ if (!uuidStack .isEmpty ()) {
199+ producerUuid = uuidStack .pop ();
200+ } else {
201+ logger .error ("[ERR-127] No producer UUID found for this constructor instance" );
202+ }
203+
201204 // Clean up ThreadLocal when outer-most constructor finishes
202205 if (stack .isEmpty ()) {
203206 TL_PROPS_STACK .remove ();
207+ TL_UUID_STACK .remove ();
204208 }
205209
206210 String bootstrapServers = producerProps .getProperty ("bootstrap.servers" );
207211 if (bootstrapServers == null || bootstrapServers .isEmpty ()) {
208- logger .error ("bootstrap.servers missing in captured properties; skipping reporter setup" );
212+ logger .error ("[ERR-007] bootstrap.servers missing in captured properties; skipping reporter setup" );
209213 return ;
210214 }
211215
@@ -228,29 +232,29 @@ public static void onExit(@Advice.This Object producer) {
228232 logger .debug ("Registering producer with metrics collector: {} (client.id='{}')" , producerId , clientIdForStats );
229233
230234 // Create a reporter for this producer instance – pass the original client.id
231- ClientStatsReporter reporter = new ClientStatsReporter (bootstrapServers , producerProps , clientIdForStats );
235+ ClientStatsReporter reporter = new ClientStatsReporter (bootstrapServers , producerProps , clientIdForStats , producerUuid );
232236
233- // Create metrics info for this producer
234- ProducerMetricsInfo metricsInfo = new ProducerMetricsInfo (producer , reporter );
237+ // Create metrics info for this producer
238+ ProducerMetricsInfo metricsInfo = new ProducerMetricsInfo (producer , reporter );
235239
236- // Register with the shared collector
237- producerMetricsMap .put (producerId , metricsInfo );
238- clientStatsReporters .put (producerId , reporter );
240+ // Register with the shared collector
241+ producerMetricsMap .put (producerId , metricsInfo );
242+ clientStatsReporters .put (producerId , reporter );
239243
240- // Pop configuration info from ThreadLocal stack (if any) and attach to reporter
241- java .util .Deque <ConfigInfo > cfgStack = TL_CFG_STACK .get ();
242- ConfigInfo cfgInfo = cfgStack .isEmpty ()? null : cfgStack .pop ();
243- if (cfgStack .isEmpty ()) {
244- TL_CFG_STACK .remove ();
245- }
246- if (cfgInfo != null ) {
247- reporter .setConfigurations (cfgInfo .originalConfig , cfgInfo .optimizedConfig );
248- }
244+ // Pop configuration info from ThreadLocal stack (if any) and attach to reporter
245+ java .util .Deque <ConfigInfo > cfgStack = TL_CFG_STACK .get ();
246+ ConfigInfo cfgInfo = cfgStack .isEmpty ()? null : cfgStack .pop ();
247+ if (cfgStack .isEmpty ()) {
248+ TL_CFG_STACK .remove ();
249+ }
250+ if (cfgInfo != null ) {
251+ reporter .setConfigurations (cfgInfo .originalConfig , cfgInfo .optimizedConfig );
252+ }
249253
250254 logger .debug ("Producer {} registered with shared metrics collector" , producerId );
251255 }
252256 } catch (Exception e ) {
253- logger .error ("Error registering producer with metrics collector: " + e .getMessage (), e );
257+ logger .error ("[ERR-008] Error registering producer with metrics collector: {}" , e .getMessage (), e );
254258 }
255259 }
256260
@@ -260,7 +264,7 @@ public static void onExit(@Advice.This Object producer) {
260264 public static Properties extractProperties (Object [] args ) {
261265 // Look for Properties or Map in the arguments
262266 if (args == null ) {
263- logger .error ("extractProperties: args array is null" );
267+ logger .error ("[ERR-009] extractProperties: args array is null" );
264268 return null ;
265269 }
266270
@@ -284,7 +288,7 @@ public static Properties extractProperties(Object[] args) {
284288
285289 // If the map is unmodifiable we cannot change producer configuration -> cannot optimise
286290 if (arg .getClass ().getName ().contains ("UnmodifiableMap" )) {
287- logger .error ("Cannot optimize KafkaProducer configuration: received an unmodifiable Map ({}). " +
291+ logger .error ("[ERR-010] Cannot optimize KafkaProducer configuration: received an unmodifiable Map ({}). " +
288292 "Please pass a mutable java.util.Properties or java.util.Map instead." ,
289293 arg .getClass ().getName ());
290294 return null ; // signal caller to skip optimisation
@@ -298,7 +302,7 @@ public static Properties extractProperties(Object[] args) {
298302 return props ;
299303 } catch (ClassCastException e ) {
300304 // Not the map type we expected
301- logger .error ("extractProperties: Could not cast Map to Map<String, Object>" , e );
305+ logger .error ("[ERR-011] extractProperties: Could not cast Map to Map<String, Object>: {}" , e . getMessage () , e );
302306 return null ;
303307 }
304308 }
@@ -334,7 +338,7 @@ public static Properties extractProperties(Object[] args) {
334338 }
335339 } catch (NoSuchFieldException e ) {
336340 // Field doesn't exist, try the next one
337- logger .error ("extractProperties: Field {} not found in ProducerConfig" , fieldName );
341+ logger .error ("[ERR-017] extractProperties: Field {} not found in ProducerConfig" , fieldName );
338342 continue ;
339343 }
340344 }
@@ -399,13 +403,14 @@ public static Properties extractProperties(Object[] args) {
399403 }
400404
401405 } catch (Exception e ) {
402- logger .error ("extractProperties: Failed to extract properties from ProducerConfig: {}" ,
406+ logger .error ("[ERR-018] extractProperties: Failed to extract properties from ProducerConfig: {}" ,
403407 e .getMessage (), e );
408+ return null ;
404409 }
405410 }
406411 }
407412
408- logger .error ("extractProperties: No valid configuration object found in arguments" );
413+ logger .error ("[ERR-019] extractProperties: No valid configuration object found in arguments" );
409414 return null ;
410415 }
411416
@@ -595,13 +600,12 @@ public void start() {
595600 COLLECTION_INTERVAL_MS / 2 , // Start sooner for first collection
596601 COLLECTION_INTERVAL_MS ,
597602 TimeUnit .MILLISECONDS );
598- logger .debug ("Metrics collection scheduler started successfully" );
599603 } catch (Exception e ) {
600- logger .error ("Failed to schedule metrics collection: " + e .getMessage (), e );
604+ logger .error ("[ERR-012] Failed to schedule metrics collection: {}" , e .getMessage (), e );
601605 running .set (false );
602606 }
603607 } else {
604- logger .info ("Metrics collector already running" );
608+ logger .debug ("Metrics collector already running" );
605609 }
606610 }
607611
@@ -644,7 +648,7 @@ public void collectAllMetrics() {
644648 skippedCount ++;
645649 }
646650 } catch (Exception e ) {
647- logger .error ("Error collecting metrics for producer {}: {}" , producerId , e .getMessage ());
651+ logger .error ("[ERR-013] Error collecting metrics for producer {}: {}" , producerId , e .getMessage (), e );
648652 }
649653 }
650654
@@ -653,7 +657,7 @@ public void collectAllMetrics() {
653657 totalProducers , successCount , skippedCount );
654658
655659 } catch (Exception e ) {
656- logger .error ("Error in metrics collection cycle: " + e .getMessage (), e );
660+ logger .error ("[ERR-014] Error in metrics collection cycle: {}" , e .getMessage (), e );
657661 }
658662 }
659663
@@ -757,7 +761,7 @@ public boolean collectMetricsForProducer(String producerId, ProducerMetricsInfo
757761 }
758762 }
759763 } catch (Exception snapshotEx ) {
760- // ignore snapshot errors
764+ logger . error ( "[ERR-015] Error extracting metrics snapshot for producer {}: {}" , producerId , snapshotEx . getMessage (), snapshotEx );
761765 }
762766
763767 // Update reporter with latest metrics snapshot
@@ -805,7 +809,7 @@ public boolean collectMetricsForProducer(String producerId, ProducerMetricsInfo
805809
806810 return true ;
807811 } catch (Exception e ) {
808- logger .error ("Error collecting Kafka metrics for producer {}: {}" , producerId , e .getMessage (), e );
812+ logger .error ("[ERR-016] Error collecting Kafka metrics for producer {}: {}" , producerId , e .getMessage (), e );
809813 return false ;
810814 }
811815 }
0 commit comments