Skip to content

Commit c5614d7

Browse files
authored
fix(metrics): Re-set consumer member ID tag after every partition assignment (#466)
* fix(metrics): Re-set consumer member ID tag after every partition assignment The member ID tag is set too early to a static value. * fix tests
1 parent c02a2a5 commit c5614d7

File tree

3 files changed

+48
-19
lines changed

3 files changed

+48
-19
lines changed

arroyo/processing/processor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,8 @@ class InvalidStateError(RuntimeError):
8989

9090

9191
class MetricsBuffer:
92-
def __init__(self, consumer: Consumer[Any]) -> None:
93-
self.metrics = get_consumer_metrics(consumer.member_id)
94-
self.__consumer = consumer
92+
def __init__(self) -> None:
93+
self.metrics = get_consumer_metrics()
9594
self.__timers: MutableMapping[ConsumerTiming, float] = defaultdict(float)
9695
self.__counters: MutableMapping[ConsumerCounter, int] = defaultdict(int)
9796
self.__reset()
@@ -143,7 +142,7 @@ def __init__(
143142
) -> None:
144143
self.__consumer = consumer
145144
self.__processor_factory = processor_factory
146-
self.__metrics_buffer = MetricsBuffer(consumer)
145+
self.__metrics_buffer = MetricsBuffer()
147146

148147
self.__processing_strategy: Optional[
149148
ProcessingStrategy[TStrategyPayload]
@@ -236,6 +235,8 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None:
236235
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
237236
logger.info("New partitions assigned: %r", partitions)
238237
logger.info("Member id: %r", self.__consumer.member_id)
238+
self.__metrics_buffer.metrics.consumer_member_id = self.__consumer.member_id
239+
239240
self.__metrics_buffer.metrics.increment(
240241
"arroyo.consumer.partitions_assigned.count", len(partitions)
241242
)
@@ -245,6 +246,7 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
245246

246247
if self.__dlq_policy:
247248
self.__dlq_policy.reset_dlq_limits(current_partitions)
249+
248250
if current_partitions:
249251
if self.__processing_strategy is not None:
250252
# TODO: for cooperative-sticky rebalancing this can happen

arroyo/utils/metrics.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,21 @@ class ConsumerMetricsWrapper(Metrics):
4949
"""
5050
A wrapper around a metrics backend that automatically adds consumer_member_id
5151
to all metrics calls.
52+
53+
Right now we only use this to add tags to the metrics emitted by
54+
StreamProcessor, but ideally all metrics, even those emitted by strategies
55+
and application code, would get this tag. The metrics abstraction in arroyo
56+
is not sufficient for this. We'd have to add a "add_global_tags" method
57+
(similar to the concept of global tags in sentry) and users would have to
58+
implement it.
5259
"""
5360

54-
def __init__(self, metrics: Metrics, consumer_member_id: str) -> None:
61+
def __init__(self, metrics: Metrics) -> None:
5562
self.__metrics = metrics
56-
self.__consumer_member_id = consumer_member_id
63+
self.consumer_member_id = ""
5764

5865
def _add_consumer_tag(self, tags: Optional[Tags]) -> Tags:
59-
"""Add consumer_member_id to the provided tags."""
60-
consumer_tags = {"consumer_member_id": self.__consumer_member_id}
61-
if tags:
62-
return {**consumer_tags, **tags}
63-
return consumer_tags
66+
return {**(tags or {}), "consumer_member_id": self.consumer_member_id}
6467

6568
def increment(
6669
self,
@@ -169,12 +172,12 @@ def get_metrics() -> Metrics:
169172
return _metrics_backend
170173

171174

172-
def get_consumer_metrics(consumer_member_id: str) -> Metrics:
175+
def get_consumer_metrics() -> ConsumerMetricsWrapper:
173176
"""
174177
Get a metrics backend that automatically adds consumer_member_id to all metrics.
175178
"""
176179
base_metrics = get_metrics()
177-
return ConsumerMetricsWrapper(base_metrics, consumer_member_id)
180+
return ConsumerMetricsWrapper(base_metrics)
178181

179182

180183
__all__ = ["configure_metrics", "Metrics", "MetricName", "Tags", "get_consumer_metrics"]

tests/utils/test_metrics.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,19 @@
11
import pytest
22

3-
from arroyo.utils.metrics import Gauge, MetricName, configure_metrics, get_metrics, get_consumer_metrics
3+
from arroyo.utils.metrics import (
4+
Gauge,
5+
MetricName,
6+
configure_metrics,
7+
get_consumer_metrics,
8+
get_metrics,
9+
)
410
from tests.metrics import Gauge as GaugeCall
5-
from tests.metrics import Increment, Timing, TestingMetricsBackend, _TestingMetricsBackend
11+
from tests.metrics import (
12+
Increment,
13+
TestingMetricsBackend,
14+
Timing,
15+
_TestingMetricsBackend,
16+
)
617

718

819
def test_gauge_simple() -> None:
@@ -40,7 +51,8 @@ def test_consumer_metrics_wrapper() -> None:
4051
configure_metrics(backend, force=True)
4152

4253
consumer_member_id = "test-consumer-123"
43-
consumer_metrics = get_consumer_metrics(consumer_member_id)
54+
consumer_metrics = get_consumer_metrics()
55+
consumer_metrics.consumer_member_id = consumer_member_id
4456

4557
# Test increment
4658
consumer_metrics.increment("arroyo.consumer.run.count", 5, tags={"extra": "tag"})
@@ -52,9 +64,21 @@ def test_consumer_metrics_wrapper() -> None:
5264
consumer_metrics.timing("arroyo.consumer.poll.time", 100, tags={"another": "tag"})
5365

5466
expected_calls = [
55-
Increment("arroyo.consumer.run.count", 5, {"consumer_member_id": consumer_member_id, "extra": "tag"}),
56-
GaugeCall("arroyo.consumer.librdkafka.total_queue_size", 10.5, {"consumer_member_id": consumer_member_id}),
57-
Timing("arroyo.consumer.poll.time", 100, {"consumer_member_id": consumer_member_id, "another": "tag"}),
67+
Increment(
68+
"arroyo.consumer.run.count",
69+
5,
70+
{"consumer_member_id": consumer_member_id, "extra": "tag"},
71+
),
72+
GaugeCall(
73+
"arroyo.consumer.librdkafka.total_queue_size",
74+
10.5,
75+
{"consumer_member_id": consumer_member_id},
76+
),
77+
Timing(
78+
"arroyo.consumer.poll.time",
79+
100,
80+
{"consumer_member_id": consumer_member_id, "another": "tag"},
81+
),
5882
]
5983

6084
assert backend.calls == expected_calls

0 commit comments

Comments
 (0)