@@ -99,38 +99,9 @@ def collect_confluent_metrics(producer: Any) -> Dict[str, Any]:
9999def collect_aiokafka_metrics (producer : Any ) -> Dict [str , Any ]:
100100 """Collect metrics from aiokafka producer."""
101101 try :
102- metrics = producer .metrics ()
103- if not metrics :
104- return {}
105-
106- # Extract relevant producer metrics - flat structure
107- producer_metrics = {}
108-
109- # aiokafka metrics structure is similar to kafka-python
110- for metric_name , metric_data in metrics .items ():
111- # Look for producer-related metrics
112- if any (keyword in metric_name .lower () for keyword in ['producer' , 'record' , 'batch' , 'request' , 'connection' , 'network' , 'io' , 'buffer' , 'compression' ]):
113- # Extract the value from the metric data
114- if hasattr (metric_data , 'value' ):
115- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .value )
116- elif isinstance (metric_data , dict ) and 'value' in metric_data :
117- producer_metrics [metric_name ] = sanitize_metric_value (metric_data ['value' ])
118- elif hasattr (metric_data , 'count' ):
119- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .count )
120- elif hasattr (metric_data , 'mean' ):
121- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .mean )
122- elif hasattr (metric_data , 'rate' ):
123- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .rate )
124- elif hasattr (metric_data , 'total' ):
125- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .total )
126- elif hasattr (metric_data , 'avg' ):
127- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .avg )
128- elif hasattr (metric_data , 'max' ):
129- producer_metrics [metric_name ] = sanitize_metric_value (metric_data .max )
130- else :
131- producer_metrics [metric_name ] = sanitize_metric_value (str (metric_data ))
132-
133- return producer_metrics
102+ # aiokafka producers don't have a metrics() method
103+ # Return empty dict as aiokafka doesn't provide metrics
104+ return {}
134105 except Exception as e :
135106 logger .error ("[ERR-308] Failed to collect aiokafka producer metrics: {}" , e )
136107 return {}
@@ -194,45 +165,9 @@ def collect_confluent_topic_metrics(producer: Any) -> Dict[str, Any]:
194165def collect_aiokafka_topic_metrics (producer : Any ) -> Dict [str , Any ]:
195166 """Collect topic metrics from aiokafka producer."""
196167 try :
197- metrics = producer .metrics ()
198- if not metrics :
199- return {}
200-
201- topic_metrics = {}
202-
203- # Look for topic-specific metrics - nested structure with topic names as keys
204- for metric_name , metric_data in metrics .items ():
205- if 'topic' in metric_name .lower ():
206- # Extract topic name from metric name (e.g., "topic.my-topic.record-send-rate")
207- parts = metric_name .split ('.' )
208- if len (parts ) >= 2 and parts [0 ] == 'topic' :
209- topic_name = parts [1 ]
210- metric_key = '.' .join (parts [2 :]) if len (parts ) > 2 else 'value'
211-
212- if topic_name not in topic_metrics :
213- topic_metrics [topic_name ] = {}
214-
215- # Extract the value from the metric data
216- if hasattr (metric_data , 'value' ):
217- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .value )
218- elif isinstance (metric_data , dict ) and 'value' in metric_data :
219- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data ['value' ])
220- elif hasattr (metric_data , 'count' ):
221- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .count )
222- elif hasattr (metric_data , 'mean' ):
223- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .mean )
224- elif hasattr (metric_data , 'rate' ):
225- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .rate )
226- elif hasattr (metric_data , 'total' ):
227- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .total )
228- elif hasattr (metric_data , 'avg' ):
229- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .avg )
230- elif hasattr (metric_data , 'max' ):
231- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (metric_data .max )
232- else :
233- topic_metrics [topic_name ][metric_key ] = sanitize_metric_value (str (metric_data ))
234-
235- return topic_metrics
168+ # aiokafka producers don't have a metrics() method
169+ # Return empty dict as aiokafka doesn't provide topic metrics
170+ return {}
236171 except Exception as e :
237172 logger .error ("[ERR-312] Failed to collect aiokafka topic metrics: {}" , e )
238173 return {}
@@ -313,54 +248,9 @@ def collect_confluent_node_metrics(producer: Any) -> Dict[str, Any]:
313248def collect_aiokafka_node_metrics (producer : Any ) -> Dict [str , Any ]:
314249 """Collect node metrics from aiokafka producer."""
315250 try :
316- metrics = producer .metrics ()
317- if not metrics :
318- return {}
319-
320- node_metrics = {}
321-
322- # Look for node/broker-specific metrics - nested structure with node IDs as keys
323- for metric_name , metric_data in metrics .items ():
324- if any (keyword in metric_name .lower () for keyword in ['node' , 'broker' , 'connection' , 'network' ]):
325- # Extract node ID from metric name (e.g., "node.8.request-rate")
326- parts = metric_name .split ('.' )
327- if len (parts ) >= 2 and parts [0 ] in ['node' , 'broker' ]:
328- node_id = parts [1 ]
329-
330- # Filter out bootstrap node metrics
331- if node_id .startswith ('node-bootstrap' ):
332- continue
333-
334- # Remove "node-" prefix if present
335- if node_id .startswith ('node-' ):
336- node_id = node_id [5 :] # Remove "node-" prefix
337-
338- metric_key = '.' .join (parts [2 :]) if len (parts ) > 2 else 'value'
339-
340- if node_id not in node_metrics :
341- node_metrics [node_id ] = {}
342-
343- # Extract the value from the metric data
344- if hasattr (metric_data , 'value' ):
345- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .value )
346- elif isinstance (metric_data , dict ) and 'value' in metric_data :
347- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data ['value' ])
348- elif hasattr (metric_data , 'count' ):
349- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .count )
350- elif hasattr (metric_data , 'mean' ):
351- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .mean )
352- elif hasattr (metric_data , 'rate' ):
353- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .rate )
354- elif hasattr (metric_data , 'total' ):
355- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .total )
356- elif hasattr (metric_data , 'avg' ):
357- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .avg )
358- elif hasattr (metric_data , 'max' ):
359- node_metrics [node_id ][metric_key ] = sanitize_metric_value (metric_data .max )
360- else :
361- node_metrics [node_id ][metric_key ] = sanitize_metric_value (str (metric_data ))
362-
363- return node_metrics
251+ # aiokafka producers don't have a metrics() method
252+ # Return empty dict as aiokafka doesn't provide node metrics
253+ return {}
364254 except Exception as e :
365255 logger .error ("[ERR-316] Failed to collect aiokafka node metrics: {}" , e )
366256 return {}
0 commit comments