Skip to content

Commit b176457

Browse files
feat: add consumer_member_id to metrics by default (#462)
* feat: add consumer_member_id to metrics by default - Add ConsumerMetricsWrapper to automatically include consumer_member_id in all metrics - Add get_consumer_metrics() function for creating consumer-aware metrics instances - Update MetricsBuffer to use consumer-aware metrics automatically - Update all processor metrics calls to include consumer member ID - Add comprehensive tests for new functionality This enables better observability and debugging in distributed consumer environments by automatically tagging all consumer-related metrics with the consumer member ID. * fix: resolve mypy strict type checking issues in consumer metrics - Update MetricsBuffer record_* methods to use MetricName instead of str - Add MetricName import to processor.py - Fix test to use valid metric names from MetricName literal - Ensure strict type safety compliance for consumer metrics wrapper All mypy --strict checks now pass with no errors. * Remove unnecessary metrics wrapper methods in MetricsBuffer The record_timing, record_increment, and record_gauge wrapper methods in MetricsBuffer claimed to add default tags but were just pass-through functions that added no additional functionality. This removes the misleading wrappers and replaces all usage with direct calls to the underlying self.metrics methods. Default tags like consumer member ID are already handled by get_consumer_metrics(consumer.member_id) at the appropriate level. * remove unused --------- Co-authored-by: Markus Unterwaditzer <[email protected]>
1 parent 8ab18aa commit b176457

File tree

3 files changed

+81
-9
lines changed

3 files changed

+81
-9
lines changed

arroyo/processing/processor.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
)
3030
from arroyo.types import BrokerValue, Message, Partition, Topic, TStrategyPayload
3131
from arroyo.utils.logging import handle_internal_error
32-
from arroyo.utils.metrics import get_metrics
32+
from arroyo.utils.metrics import get_consumer_metrics
3333

3434
logger = logging.getLogger(__name__)
3535

@@ -89,8 +89,9 @@ class InvalidStateError(RuntimeError):
8989

9090

9191
class MetricsBuffer:
92-
def __init__(self) -> None:
93-
self.metrics = get_metrics()
92+
def __init__(self, consumer: Consumer[Any]) -> None:
93+
self.metrics = get_consumer_metrics(consumer.member_id)
94+
self.__consumer = consumer
9495
self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float)
9596
self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int)
9697
self.__reset()
@@ -142,7 +143,7 @@ def __init__(
142143
) -> None:
143144
self.__consumer = consumer
144145
self.__processor_factory = processor_factory
145-
self.__metrics_buffer = MetricsBuffer()
146+
self.__metrics_buffer = MetricsBuffer(consumer)
146147

147148
self.__processing_strategy: Optional[
148149
ProcessingStrategy[TStrategyPayload]
@@ -236,7 +237,7 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
236237
logger.info("New partitions assigned: %r", partitions)
237238
logger.info("Member id: %r", self.__consumer.member_id)
238239
self.__metrics_buffer.metrics.increment(
239-
"arroyo.consumer.partitions_assigned.count", len(partitions), tags={"consumer_member_id": self.__consumer.member_id}
240+
"arroyo.consumer.partitions_assigned.count", len(partitions)
240241
)
241242

242243
current_partitions = dict(self.__consumer.tell())
@@ -262,7 +263,7 @@ def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
262263
logger.info("Partitions to revoke: %r", partitions)
263264

264265
self.__metrics_buffer.metrics.increment(
265-
"arroyo.consumer.partitions_revoked.count", len(partitions), tags={"consumer_member_id": self.__consumer.member_id}
266+
"arroyo.consumer.partitions_revoked.count", len(partitions)
266267
)
267268

268269
if partitions:

arroyo/utils/metrics.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,42 @@ def timing(
4545
raise NotImplementedError
4646

4747

48+
class ConsumerMetricsWrapper(Metrics):
49+
"""
50+
A wrapper around a metrics backend that automatically adds consumer_member_id
51+
to all metrics calls.
52+
"""
53+
54+
def __init__(self, metrics: Metrics, consumer_member_id: str) -> None:
55+
self.__metrics = metrics
56+
self.__consumer_member_id = consumer_member_id
57+
58+
def _add_consumer_tag(self, tags: Optional[Tags]) -> Tags:
59+
"""Add consumer_member_id to the provided tags."""
60+
consumer_tags = {"consumer_member_id": self.__consumer_member_id}
61+
if tags:
62+
return {**consumer_tags, **tags}
63+
return consumer_tags
64+
65+
def increment(
66+
self,
67+
name: MetricName,
68+
value: Union[int, float] = 1,
69+
tags: Optional[Tags] = None,
70+
) -> None:
71+
self.__metrics.increment(name, value, tags=self._add_consumer_tag(tags))
72+
73+
def gauge(
74+
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
75+
) -> None:
76+
self.__metrics.gauge(name, value, tags=self._add_consumer_tag(tags))
77+
78+
def timing(
79+
self, name: MetricName, value: Union[int, float], tags: Optional[Tags] = None
80+
) -> None:
81+
self.__metrics.timing(name, value, tags=self._add_consumer_tag(tags))
82+
83+
4884
class DummyMetricsBackend(Metrics):
4985
"""
5086
Default metrics backend that does not record anything.
@@ -133,4 +169,12 @@ def get_metrics() -> Metrics:
133169
return _metrics_backend
134170

135171

136-
__all__ = ["configure_metrics", "Metrics", "MetricName"]
172+
def get_consumer_metrics(consumer_member_id: str) -> Metrics:
173+
"""
174+
Get a metrics backend that automatically adds consumer_member_id to all metrics.
175+
"""
176+
base_metrics = get_metrics()
177+
return ConsumerMetricsWrapper(base_metrics, consumer_member_id)
178+
179+
180+
__all__ = ["configure_metrics", "Metrics", "MetricName", "Tags", "get_consumer_metrics"]

tests/utils/test_metrics.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import pytest
22

3-
from arroyo.utils.metrics import Gauge, MetricName, configure_metrics, get_metrics
3+
from arroyo.utils.metrics import Gauge, MetricName, configure_metrics, get_metrics, get_consumer_metrics
44
from tests.metrics import Gauge as GaugeCall
5-
from tests.metrics import TestingMetricsBackend, _TestingMetricsBackend
5+
from tests.metrics import Increment, Timing, TestingMetricsBackend, _TestingMetricsBackend
66

77

88
def test_gauge_simple() -> None:
@@ -31,3 +31,30 @@ def test_configure_metrics() -> None:
3131
# Can be reset to something else with force
3232
configure_metrics(_TestingMetricsBackend(), force=True)
3333
assert get_metrics() != TestingMetricsBackend
34+
35+
36+
def test_consumer_metrics_wrapper() -> None:
37+
"""Test that ConsumerMetricsWrapper automatically adds consumer_member_id to all metrics."""
38+
# Reset to a fresh backend
39+
backend = _TestingMetricsBackend()
40+
configure_metrics(backend, force=True)
41+
42+
consumer_member_id = "test-consumer-123"
43+
consumer_metrics = get_consumer_metrics(consumer_member_id)
44+
45+
# Test increment
46+
consumer_metrics.increment("arroyo.consumer.run.count", 5, tags={"extra": "tag"})
47+
48+
# Test gauge
49+
consumer_metrics.gauge("arroyo.consumer.librdkafka.total_queue_size", 10.5)
50+
51+
# Test timing
52+
consumer_metrics.timing("arroyo.consumer.poll.time", 100, tags={"another": "tag"})
53+
54+
expected_calls = [
55+
Increment("arroyo.consumer.run.count", 5, {"consumer_member_id": consumer_member_id, "extra": "tag"}),
56+
GaugeCall("arroyo.consumer.librdkafka.total_queue_size", 10.5, {"consumer_member_id": consumer_member_id}),
57+
Timing("arroyo.consumer.poll.time", 100, {"consumer_member_id": consumer_member_id, "another": "tag"}),
58+
]
59+
60+
assert backend.calls == expected_calls

0 commit comments

Comments
 (0)