Skip to content

Commit f20ddf6

Browse files
support of metrics collection for Confluent lib
1 parent a8336bb commit f20ddf6

File tree

4 files changed

+281
-69
lines changed

4 files changed

+281
-69
lines changed

examples/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
kafka-python==2.2.14
2-
confluent-kafka==2.3.0
2+
confluent-kafka==2.11.0
33
aiokafka==0.10.0
44
aws-msk-iam-sasl-signer-python==1.0.2

superclient/agent/interceptor.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
"""Producer interception functionality."""
22

33
import os
4+
import uuid
45
from typing import Any, Dict
56

67
from ..util.logger import get_logger
78
from ..util.config import get_topics_list, is_disabled
9+
from ..util.metrics import configure_confluent_stats_callback
810
from .metadata import fetch_metadata_sync, optimal_cfg, _DEFAULTS
911
from ..core.reporter import send_clients_msg
1012
from ..core.manager import normalize_bootstrap
@@ -345,10 +347,18 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs):
345347
logger.debug("Overriding configuration: {} ((not set) -> {})", k, v)
346348
conf[k] = v
347349

350+
351+
# Generate UUID for this producer
352+
tracker_uuid = str(uuid.uuid4())
353+
354+
# Configure stats callback for metrics collection
355+
conf = configure_confluent_stats_callback(conf, tracker_uuid)
356+
348357
# Create the producer with optimized configuration
349358
self._producer = Producer(conf, *args, **kwargs)
350359

351360
report_interval = metadata.get("report_interval_ms") if metadata else _DEFAULT_REPORT_INTERVAL_MS
361+
# Create tracker with the generated UUID
352362
self._tracker = ProducerTracker(
353363
lib="confluent",
354364
producer=self._producer,
@@ -357,10 +367,12 @@ def __init__(self, conf: Dict[str, Any], *args, **kwargs):
357367
orig_cfg=orig_cfg,
358368
opt_cfg=opt_cfg,
359369
report_interval_ms=int(report_interval or _DEFAULT_REPORT_INTERVAL_MS),
360-
error=error_msg, # Store error message in tracker
370+
error=error_msg,
361371
metadata=metadata,
362372
topics_env=topics_env,
373+
uuid=tracker_uuid, # Use the generated UUID
363374
)
375+
364376
Heartbeat.register_tracker(self._tracker)
365377

366378
send_clients_msg(self._tracker, error_msg)
@@ -397,12 +409,15 @@ def __del__(self):
397409
self._superstream_closed = True
398410
self._tracker.close()
399411
Heartbeat.unregister_tracker(self._tracker.uuid)
412+
413+
# Remove metrics extractor from registry
414+
from ..util.metrics import remove_producer_metrics_extractor
415+
remove_producer_metrics_extractor(self._tracker.uuid)
416+
400417
logger.debug("Superstream tracking stopped for confluent-kafka producer with client_id: {}",
401418
getattr(self._tracker, 'client_id', 'unknown'))
402419
except Exception as e:
403420
logger.error("Error during automatic cleanup: {}", e)
404-
else:
405-
logger.debug("Producer already cleaned up or no tracker found")
406421

407422
def __getattr__(self, name):
408423
"""Delegate all other attributes to the underlying producer."""

superclient/agent/tracker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ def __init__(
2424
error: str = "",
2525
metadata: Optional[Dict[str, Any]] = None,
2626
topics_env: Optional[list[str]] = None,
27+
uuid: Optional[str] = None,
2728
) -> None:
28-
self.uuid = str(uuid.uuid4())
29+
self.uuid = uuid if uuid else str(uuid.uuid4())
2930
self.library = lib
3031
self.producer = producer
3132
self.bootstrap = bootstrap
@@ -95,7 +96,7 @@ def _update_cache_from_producer(self):
9596
from ..util.metrics import collect_all_metrics, merge_metrics_with_cache
9697

9798
# Collect fresh metrics
98-
current_producer_metrics, current_topic_metrics, current_node_metrics = collect_all_metrics(self.producer, self.library)
99+
current_producer_metrics, current_topic_metrics, current_node_metrics = collect_all_metrics(self.producer, self.library, self.uuid)
99100

100101
# Get current cached metrics
101102
cached_producer_metrics, cached_topic_metrics, cached_node_metrics = self.get_cached_metrics()

0 commit comments

Comments
 (0)