|
3 | 3 | import os |
4 | 4 | import uuid |
5 | 5 | from typing import Any, Dict |
| 6 | +import importlib |
6 | 7 |
|
7 | 8 | from ..util.logger import get_logger |
8 | 9 | from ..util.config import get_topics_list, is_disabled |
@@ -245,8 +246,102 @@ async def stop_patch(*a, **kw): |
245 | 246 | await original_stop(*a, **kw) |
246 | 247 |
|
247 | 248 | self.stop = stop_patch |
| 249 | + |
| 250 | + # Patch the SendProduceReqHandler.create_request method for this specific producer |
| 251 | + sender_mod = importlib.import_module("aiokafka.producer.sender") |
| 252 | + |
| 253 | + # Only patch once globally |
| 254 | + if not hasattr(sender_mod.SendProduceReqHandler, '_superstream_patched'): |
| 255 | + orig_create_request = sender_mod.SendProduceReqHandler.create_request |
| 256 | + |
| 257 | + def create_request_with_metrics(self_handler): |
| 258 | + # Call the original method to get the request, but collect metrics |
| 259 | + # self_handler._batches: Dict[TopicPartition, MessageBatch] |
| 260 | + |
| 261 | + # Quick check: if sender has no tracker, it's an internal producer - skip metrics |
| 262 | + if not hasattr(self_handler._sender, '_superstream_tracker'): |
| 263 | + return orig_create_request(self_handler) |
| 264 | + |
| 265 | + tracker = self_handler._sender._superstream_tracker |
| 266 | + |
| 267 | + # Additional check: skip internal producers by client_id |
| 268 | + if tracker is None or tracker.client_id.startswith(_SUPERLIB_PREFIX): |
| 269 | + return orig_create_request(self_handler) |
| 270 | + |
| 271 | + # Per-producer totals |
| 272 | + total_uncompressed = 0 |
| 273 | + total_compressed = 0 |
| 274 | + total_records = 0 |
| 275 | + topic_stats = {} |
| 276 | + for tp, batch in self_handler._batches.items(): |
| 277 | + # Get record count from the batch |
| 278 | + record_count = batch.record_count |
| 279 | + |
| 280 | + # Get compressed size from the batch buffer |
| 281 | + compressed = 0 |
| 282 | + try: |
| 283 | + compressed = len(batch.get_data_buffer()) |
| 284 | + except Exception: |
| 285 | + pass |
| 286 | + |
| 287 | + # Estimate uncompressed size based on record count |
| 288 | + # Since we can't easily access the original message data at this point, |
| 289 | + # we'll use a reasonable estimate based on the batch size and record count |
| 290 | + if record_count > 0: |
| 291 | + # Estimate uncompressed size based on compressed size and typical compression ratios |
| 292 | + # This is an approximation since we can't access the original message data |
| 293 | + estimated_compression_ratio = 0.7 # Assume 30% compression |
| 294 | + uncompressed = int(compressed / estimated_compression_ratio) |
| 295 | + else: |
| 296 | + uncompressed = 0 |
| 297 | + |
| 298 | + total_uncompressed += uncompressed |
| 299 | + total_compressed += compressed |
| 300 | + total_records += record_count |
| 301 | + # Per-topic |
| 302 | + if tp.topic not in topic_stats: |
| 303 | + topic_stats[tp.topic] = {'uncompressed': 0, 'compressed': 0, 'records': 0} |
| 304 | + topic_stats[tp.topic]['uncompressed'] += uncompressed |
| 305 | + topic_stats[tp.topic]['compressed'] += compressed |
| 306 | + topic_stats[tp.topic]['records'] += record_count |
| 307 | + # Update tracker |
| 308 | + if total_records > 0: |
| 309 | + tracker._superstream_metrics = getattr(tracker, '_superstream_metrics', {}) |
| 310 | + m = tracker._superstream_metrics |
| 311 | + # Accumulate totals (aggregative counters) |
| 312 | + m['outgoing-byte-total'] = m.get('outgoing-byte-total', 0) + total_compressed |
| 313 | + m['record-send-total'] = m.get('record-send-total', 0) + total_records |
| 314 | + m['uncompressed-byte-total'] = m.get('uncompressed-byte-total', 0) + total_uncompressed |
| 315 | + |
| 316 | + # Calculate rates from aggregated totals |
| 317 | + m['compression-rate-avg'] = (m['outgoing-byte-total'] / m['uncompressed-byte-total']) if m['uncompressed-byte-total'] else 1.0 |
| 318 | + m['record-size-avg'] = (m['uncompressed-byte-total'] / m['record-send-total']) if m['record-send-total'] else 0 |
| 319 | + |
| 320 | + # Per-topic |
| 321 | + m['topics'] = m.get('topics', {}) |
| 322 | + for topic, stats in topic_stats.items(): |
| 323 | + t = m['topics'].setdefault(topic, {'byte-total': 0, 'record-send-total': 0, 'uncompressed-total': 0}) |
| 324 | + # Accumulate totals (aggregative counters) |
| 325 | + t['byte-total'] = t.get('byte-total', 0) + stats['compressed'] |
| 326 | + t['record-send-total'] = t.get('record-send-total', 0) + stats['records'] |
| 327 | + t['uncompressed-total'] = t.get('uncompressed-total', 0) + stats['uncompressed'] |
| 328 | + |
| 329 | + # Calculate compression rate from aggregated totals |
| 330 | + t['compression-rate'] = (t['byte-total'] / t['uncompressed-total']) if t['uncompressed-total'] else 1.0 |
| 331 | + |
| 332 | + tracker._superstream_metrics = m |
| 333 | + return orig_create_request(self_handler) |
| 334 | + |
| 335 | + sender_mod.SendProduceReqHandler.create_request = create_request_with_metrics |
| 336 | + sender_mod.SendProduceReqHandler._superstream_patched = True |
| 337 | + |
248 | 338 | self._superstream_patch = True |
249 | 339 | orig_init(self, *args, **kwargs) |
| 340 | + |
| 341 | + # Store tracker reference in the sender for metrics collection |
| 342 | + if hasattr(self, '_sender'): |
| 343 | + self._sender._superstream_tracker = tr |
| 344 | + |
250 | 345 | send_clients_msg(tr, error_msg) |
251 | 346 |
|
252 | 347 | # Log success message based on whether defaults were used |
|
0 commit comments