@@ -40,6 +40,36 @@ def __init__(
4040 self .metadata = metadata
4141 self .topics_env = topics_env or []
4242 self .start_time_ms = int (time .time () * 1000 ) # Unix timestamp in milliseconds
43+
44+ # Metrics cache for maintaining last known values
45+ self .producer_metrics_cache : Dict [str , Any ] = {}
46+ self .topic_metrics_cache : Dict [str , Any ] = {}
47+ self .node_metrics_cache : Dict [str , Any ] = {}
48+
49+ # Cache update tracking
50+ self .last_cache_update = 0.0
51+ self .cache_update_interval = 30.0 # Update cache every 10 seconds
52+
53+ # Total tracking fields for cumulative metrics
54+ self .producer_totals = {
55+ 'outgoing-byte-total' : 0 ,
56+ 'record-send-total' : 0
57+ }
58+ self .topic_totals = {} # topic_name -> {metric -> total}
59+
60+ # Fractional tracking to avoid rounding errors
61+ self .producer_fractionals = {
62+ 'outgoing-byte-total' : 0.0 ,
63+ 'record-send-total' : 0.0
64+ }
65+ self .topic_fractionals = {} # topic_name -> {metric -> fractional}
66+
67+ # Compression rate tracking for preserving last known good values
68+ self .last_known_producer_compression_rate : Optional [float ] = None
69+ self .last_known_topic_compression_rates : Dict [str , float ] = {}
70+
71+ # Record size tracking for preserving last known good values
72+ self .last_known_producer_record_size : Optional [float ] = None
4373
4474 def record_topic (self , topic : str ):
4575 """Record a topic that this producer writes to."""
@@ -49,6 +79,261 @@ def close(self):
4979 """Mark this tracker as inactive."""
5080 self .active = False
5181
82+ def should_update_cache (self ) -> bool :
83+ """Check if it's time to update the cache."""
84+ return time .time () - self .last_cache_update >= self .cache_update_interval
85+
86+ def update_cache_if_needed (self ):
87+ """Update cache if enough time has passed since last update."""
88+ if self .should_update_cache ():
89+ self ._update_cache_from_producer ()
90+ self .last_cache_update = time .time ()
91+
92+ def _update_cache_from_producer (self ):
93+ """Update cache with fresh metrics from producer using merge strategy and calculate totals."""
94+ try :
95+ from ..util .metrics import collect_all_metrics , merge_metrics_with_cache
96+
97+ # Collect fresh metrics
98+ current_producer_metrics , current_topic_metrics , current_node_metrics = collect_all_metrics (self .producer , self .library )
99+
100+ # Get current cached metrics
101+ cached_producer_metrics , cached_topic_metrics , cached_node_metrics = self .get_cached_metrics ()
102+
103+ # Calculate time difference since last update
104+ current_time = time .time ()
105+ time_diff = current_time - self .last_cache_update if self .last_cache_update > 0 else 0
106+
107+ # Calculate incremental totals from rates only if native totals are missing
108+ self ._update_producer_totals_if_missing (current_producer_metrics , time_diff )
109+
110+ # Calculate incremental totals from rates only if native totals are missing
111+ self ._update_topic_totals_if_missing (current_topic_metrics , time_diff )
112+
113+ # Merge fresh metrics with cache (fresh takes precedence, cache provides fallbacks)
114+ merged_producer_metrics , merged_topic_metrics , merged_node_metrics = merge_metrics_with_cache (
115+ current_producer_metrics , current_topic_metrics , current_node_metrics ,
116+ cached_producer_metrics , cached_topic_metrics , cached_node_metrics
117+ )
118+
119+ # Add calculated totals to merged metrics
120+ merged_producer_metrics .update (self .producer_totals )
121+
122+ # Preserve last known good compression rates and record sizes when current is 0
123+ self ._preserve_compression_rates (merged_producer_metrics , merged_topic_metrics )
124+ self ._preserve_record_sizes (merged_producer_metrics )
125+
126+ # Add topic totals to merged topic metrics
127+ for topic_name , totals in self .topic_totals .items ():
128+ if topic_name in merged_topic_metrics :
129+ merged_topic_metrics [topic_name ].update (totals )
130+ else :
131+ merged_topic_metrics [topic_name ] = totals
132+
133+ # Update cache with merged results (including totals)
134+ self .producer_metrics_cache = merged_producer_metrics
135+ self .topic_metrics_cache = merged_topic_metrics
136+ self .node_metrics_cache = merged_node_metrics
137+
138+ logger .debug ("Updated metrics cache for producer {} (merged fresh + cached + totals)" , self .client_id )
139+ except Exception as e :
140+ logger .error ("[ERR-319] Failed to collect metrics for cache update for {}: {}" , self .client_id , e )
141+
142+ def update_metrics_cache (self , producer_metrics : Dict [str , Any ], topic_metrics : Dict [str , Any ], node_metrics : Dict [str , Any ]) -> None :
143+ """Update the metrics cache with new values."""
144+ self .producer_metrics_cache = producer_metrics .copy ()
145+ self .topic_metrics_cache = topic_metrics .copy ()
146+ self .node_metrics_cache = node_metrics .copy ()
147+
148+ def update_metrics_cache_selective (self , producer_metrics : Dict [str , Any ], topic_metrics : Dict [str , Any ], node_metrics : Dict [str , Any ]) -> None :
149+ """Update the metrics cache selectively - only update non-empty individual metrics to preserve good cached values.
150+
151+ This ensures that if we have metrics X and Y cached, and new collection returns only X and Z,
152+ we keep Y from cache, update X with new value, and add Z.
153+ """
154+ # Update producer metrics - only update non-empty individual metrics
155+ if producer_metrics :
156+ for metric_name , metric_value in producer_metrics .items ():
157+ if metric_value is not None : # Only update if metric has a value
158+ self .producer_metrics_cache [metric_name ] = metric_value
159+
160+ # Update topic metrics - only update non-empty individual metrics per topic
161+ if topic_metrics :
162+ for topic_name , topic_metric_data in topic_metrics .items ():
163+ if topic_name not in self .topic_metrics_cache :
164+ self .topic_metrics_cache [topic_name ] = {}
165+ if topic_metric_data : # Only update if topic has metrics
166+ for metric_name , metric_value in topic_metric_data .items ():
167+ if metric_value is not None : # Only update if metric has a value
168+ self .topic_metrics_cache [topic_name ][metric_name ] = metric_value
169+
170+ # Update node metrics - only update non-empty individual metrics per node
171+ if node_metrics :
172+ for node_id , node_metric_data in node_metrics .items ():
173+ if node_id not in self .node_metrics_cache :
174+ self .node_metrics_cache [node_id ] = {}
175+ if node_metric_data : # Only update if node has metrics
176+ for metric_name , metric_value in node_metric_data .items ():
177+ if metric_value is not None : # Only update if metric has a value
178+ self .node_metrics_cache [node_id ][metric_name ] = metric_value
179+
180+ def _update_producer_totals_if_missing (self , producer_metrics : Dict [str , Any ], time_diff : float ) -> None :
181+ """Update producer-level totals by integrating rates over time, only if native totals are missing.
182+
183+ These totals are calculated because they don't exist in native Kafka metrics.
184+ We integrate rate metrics (events/second) over time to get cumulative totals.
185+ Uses fractional tracking to avoid rounding errors at each step.
186+ """
187+ if time_diff <= 0 :
188+ return
189+
190+ # Map of rate metrics to their corresponding total fields
191+ rate_to_total_mapping = {
192+ 'outgoing-byte-rate' : 'outgoing-byte-total' ,
193+ 'record-send-rate' : 'record-send-total'
194+ }
195+
196+ for rate_metric , total_metric in rate_to_total_mapping .items ():
197+ # Only calculate if native total is missing
198+ if total_metric not in producer_metrics :
199+ if rate_metric in producer_metrics :
200+ rate_value = producer_metrics [rate_metric ]
201+ if isinstance (rate_value , (int , float )) and rate_value > 0 :
202+ # Add to fractional accumulator to avoid rounding errors
203+ incremental_fractional = rate_value * time_diff
204+ self .producer_fractionals [total_metric ] += incremental_fractional
205+
206+ # Convert accumulated fractional to integer (round only once)
207+ new_total = int (round (self .producer_fractionals [total_metric ]))
208+ old_total = self .producer_totals [total_metric ]
209+ actual_increment = new_total - old_total
210+
211+ if actual_increment > 0 :
212+ self .producer_totals [total_metric ] = new_total
213+ logger .debug ("Calculated missing {}: +{} (rate: {:.2f} * time: {:.2f}s, fractional: {:.3f})" ,
214+ total_metric , actual_increment , rate_value , time_diff , self .producer_fractionals [total_metric ])
215+ else :
216+ # Native total exists, use it instead of calculated total
217+ native_total = producer_metrics [total_metric ]
218+ if isinstance (native_total , (int , float )) and native_total > 0 :
219+ self .producer_totals [total_metric ] = int (native_total )
220+ # Reset fractional to match native total
221+ self .producer_fractionals [total_metric ] = float (native_total )
222+ logger .debug ("Using native {}: {}" , total_metric , native_total )
223+
224+ def _update_topic_totals_if_missing (self , topic_metrics : Dict [str , Any ], time_diff : float ) -> None :
225+ """Update topic-level totals by integrating rates over time, only if native totals are missing.
226+
227+ These totals are calculated because they don't exist in native Kafka metrics.
228+ We integrate rate metrics (events/second) over time to get cumulative totals.
229+ Uses fractional tracking to avoid rounding errors at each step.
230+ """
231+ if time_diff <= 0 :
232+ return
233+
234+ # Map of rate metrics to their corresponding total fields (topic level)
235+ rate_to_total_mapping = {
236+ 'record-send-rate' : 'record-send-total' ,
237+ 'byte-rate' : 'byte-total'
238+ }
239+
240+ for topic_name , topic_metric_data in topic_metrics .items ():
241+ if topic_name not in self .topic_totals :
242+ self .topic_totals [topic_name ] = {
243+ 'record-send-total' : 0 ,
244+ 'byte-total' : 0
245+ }
246+ if topic_name not in self .topic_fractionals :
247+ self .topic_fractionals [topic_name ] = {
248+ 'record-send-total' : 0.0 ,
249+ 'byte-total' : 0.0
250+ }
251+
252+ for rate_metric , total_metric in rate_to_total_mapping .items ():
253+ # Only calculate if native total is missing
254+ if total_metric not in topic_metric_data :
255+ if rate_metric in topic_metric_data :
256+ rate_value = topic_metric_data [rate_metric ]
257+ if isinstance (rate_value , (int , float )) and rate_value > 0 :
258+ # Add to fractional accumulator to avoid rounding errors
259+ incremental_fractional = rate_value * time_diff
260+ self .topic_fractionals [topic_name ][total_metric ] += incremental_fractional
261+
262+ # Convert accumulated fractional to integer (round only once)
263+ new_total = int (round (self .topic_fractionals [topic_name ][total_metric ]))
264+ old_total = self .topic_totals [topic_name ][total_metric ]
265+ actual_increment = new_total - old_total
266+
267+ if actual_increment > 0 :
268+ self .topic_totals [topic_name ][total_metric ] = new_total
269+ logger .debug ("Calculated missing topic {} {}: +{} (rate: {:.2f} * time: {:.2f}s, fractional: {:.3f})" ,
270+ topic_name , total_metric , actual_increment , rate_value , time_diff ,
271+ self .topic_fractionals [topic_name ][total_metric ])
272+ else :
273+ # Native total exists, use it instead of calculated total
274+ native_total = topic_metric_data [total_metric ]
275+ if isinstance (native_total , (int , float )) and native_total > 0 :
276+ self .topic_totals [topic_name ][total_metric ] = int (native_total )
277+ # Reset fractional to match native total
278+ self .topic_fractionals [topic_name ][total_metric ] = float (native_total )
279+ logger .debug ("Using native topic {} {}: {}" , topic_name , total_metric , native_total )
280+
281+ def _preserve_compression_rates (self , producer_metrics : Dict [str , Any ], topic_metrics : Dict [str , Any ]) -> None :
282+ """Preserve last known good compression rates when current values are 0.
283+
284+ This prevents compression rates from being reset to 0 when no compression
285+ is happening temporarily (e.g., during idle periods).
286+ """
287+ # Handle producer-level compression rate
288+ if 'compression-rate-avg' in producer_metrics :
289+ current_rate = producer_metrics ['compression-rate-avg' ]
290+ if isinstance (current_rate , (int , float )):
291+ if current_rate > 0 :
292+ # Update last known good value
293+ self .last_known_producer_compression_rate = current_rate
294+ elif current_rate == 0 and self .last_known_producer_compression_rate is not None :
295+ # Use last known good value when current is 0
296+ producer_metrics ['compression-rate-avg' ] = self .last_known_producer_compression_rate
297+ logger .debug ("Preserved producer compression rate: {} (current was 0)" ,
298+ self .last_known_producer_compression_rate )
299+
300+ # Handle topic-level compression rates
301+ for topic_name , topic_metric_data in topic_metrics .items ():
302+ if 'compression-rate' in topic_metric_data :
303+ current_rate = topic_metric_data ['compression-rate' ]
304+ if isinstance (current_rate , (int , float )):
305+ if current_rate > 0 :
306+ # Update last known good value for this topic
307+ self .last_known_topic_compression_rates [topic_name ] = current_rate
308+ elif current_rate == 0 and topic_name in self .last_known_topic_compression_rates :
309+ # Use last known good value when current is 0
310+ topic_metric_data ['compression-rate' ] = self .last_known_topic_compression_rates [topic_name ]
311+ logger .debug ("Preserved topic {} compression rate: {} (current was 0)" ,
312+ topic_name , self .last_known_topic_compression_rates [topic_name ])
313+
314+ def _preserve_record_sizes (self , producer_metrics : Dict [str , Any ]) -> None :
315+ """Preserve last known good record size when current value is 0.
316+
317+ This prevents record size from being reset to 0 when no records
318+ are being sent temporarily (e.g., during idle periods).
319+ """
320+ # Handle producer-level record size
321+ if 'record-size-avg' in producer_metrics :
322+ current_size = producer_metrics ['record-size-avg' ]
323+ if isinstance (current_size , (int , float )):
324+ if current_size > 0 :
325+ # Update last known good value
326+ self .last_known_producer_record_size = current_size
327+ elif current_size == 0 and self .last_known_producer_record_size is not None :
328+ # Use last known good value when current is 0
329+ producer_metrics ['record-size-avg' ] = self .last_known_producer_record_size
330+ logger .debug ("Preserved producer record size: {} (current was 0)" ,
331+ self .last_known_producer_record_size )
332+
333+ def get_cached_metrics (self ) -> tuple [Dict [str , Any ], Dict [str , Any ], Dict [str , Any ]]:
334+ """Get the cached metrics."""
335+ return self .producer_metrics_cache .copy (), self .topic_metrics_cache .copy (), self .node_metrics_cache .copy ()
336+
52337 def determine_topic (self ) -> str :
53338 """Get the most impactful topic for this producer based on metadata analysis."""
54339 if not self .metadata or not self .metadata .get ("topics_configuration" ):
@@ -104,18 +389,30 @@ def unregister_tracker(cls, tracker_id: str):
104389 cls ._trackers .pop (tracker_id , None )
105390
106391 def run (self ):
107- """Main heartbeat loop."""
392+ """Main heartbeat loop - handles both reporting and metrics cache updates ."""
108393 while not self ._stop_event .is_set ():
109394 now = time .time ()
110395 with self ._track_lock :
111396 trackers = list (self ._trackers .values ())
397+
112398 for tr in trackers :
113399 if not tr .active :
114400 continue
401+
402+ # Update cache if needed (every 30 seconds)
403+ try :
404+ tr .update_cache_if_needed ()
405+ except Exception as e :
406+ logger .error ("[ERR-318] Failed to update metrics cache for {}: {}" , tr .client_id , e )
407+
408+ # Send heartbeat if needed (every 5 minutes by default)
115409 if (now - tr .last_hb ) * 1000 < tr .report_interval_ms :
116410 continue
117- # Send heartbeat message
118- from ..core .reporter import send_clients_msg
119- send_clients_msg (tr , tr .error )
120- tr .last_hb = now
411+ try :
412+ from ..core .reporter import send_clients_msg
413+ send_clients_msg (tr , tr .error )
414+ tr .last_hb = now
415+ except Exception as e :
416+ logger .error ("[ERR-320] Failed to send heartbeat for {}: {}" , tr .client_id , e )
417+
121418 time .sleep (1 )
0 commit comments