Skip to content

Commit 3b89bbf

Browse files
TimPansinolrafeeihmstepanek
committed
Kafka Internal Metrics Instrumentation (#613)
* Kafka Metrics data source Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> * [Mega-Linter] Apply linters fixes * Remove breakpoint * Run linter * Fix black issues * Swap @classmethod decorator order. Co-authored-by: Hannah Stepanek <[email protected]> * Remove variable named variable Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: TimPansino <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]>
1 parent ef56ff7 commit 3b89bbf

File tree

4 files changed

+296
-8
lines changed

4 files changed

+296
-8
lines changed

newrelic/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,6 +2318,12 @@ def _process_module_builtin_defaults():
23182318
"instrument_kafka_heartbeat",
23192319
)
23202320

2321+
_process_module_definition(
2322+
"kafka.consumer.group",
2323+
"newrelic.hooks.messagebroker_kafkapython",
2324+
"instrument_kafka_consumer_group",
2325+
)
2326+
23212327
_process_module_definition(
23222328
"logging",
23232329
"newrelic.hooks.logger_logging",

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 152 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,22 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
16+
import math
17+
import threading
18+
19+
from kafka.metrics.metrics_reporter import AbstractMetricsReporter
20+
21+
import newrelic.core.agent
1522
from newrelic.api.application import application_instance
1623
from newrelic.api.message_trace import MessageTrace
1724
from newrelic.api.time_trace import notice_error
1825
from newrelic.api.transaction import current_transaction
1926
from newrelic.common.object_wrapper import wrap_function_wrapper
27+
from newrelic.packages import six
28+
from newrelic.samplers.decorators import data_source_factory
29+
30+
_logger = logging.getLogger(__name__)
2031

2132
HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
2233
HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
@@ -55,11 +66,6 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
5566
raise
5667

5768

58-
def instrument_kafka_producer(module):
59-
if hasattr(module, "KafkaProducer"):
60-
wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send)
61-
62-
6369
def metric_wrapper(metric_name, check_result=False):
6470
def _metric_wrapper(wrapped, instance, args, kwargs):
6571
result = wrapped(*args, **kwargs)
@@ -101,3 +107,144 @@ def instrument_kafka_heartbeat(module):
101107
wrap_function_wrapper(
102108
module, "Heartbeat.poll_timeout_expired", metric_wrapper(HEARTBEAT_POLL_TIMEOUT, check_result=True)
103109
)
110+
111+
112+
class KafkaMetricsDataSource(object):
113+
_instance = None
114+
115+
def __init__(self):
116+
self.reporters = []
117+
118+
@classmethod
119+
@data_source_factory(name="Kafka Metrics Reporter")
120+
def factory(cls, settings=None, environ=None):
121+
return cls.singleton()
122+
123+
@classmethod
124+
def singleton(cls, register=True):
125+
# If already initialized, exit early
126+
if cls._instance:
127+
return cls._instance
128+
129+
# Init and register instance on class
130+
instance = cls()
131+
cls._instance = instance
132+
133+
# register_data_source takes a callable so let it rerun singleton to retrieve the instance
134+
if register:
135+
try:
136+
_logger.debug("Registering kafka metrics data source.")
137+
newrelic.core.agent.agent_instance().register_data_source(cls.factory)
138+
except Exception:
139+
_logger.exception(
140+
"Attempt to register kafka metrics data source has failed. Data source will be skipped."
141+
)
142+
143+
return instance
144+
145+
def register(self, reporter):
146+
self.reporters.append(reporter)
147+
148+
def unregister(self, reporter):
149+
if reporter in self.reporters:
150+
self.reporters.remove(reporter)
151+
152+
def start(self):
153+
return
154+
155+
def stop(self):
156+
# Clear references to reporters to prevent them from participating in a reference cycle.
157+
self.reporters = []
158+
159+
def __call__(self):
160+
for reporter in self.reporters:
161+
for name, metric in six.iteritems(reporter.snapshot()):
162+
yield name, metric
163+
164+
165+
class NewRelicMetricsReporter(AbstractMetricsReporter):
166+
def __init__(self, *args, **kwargs):
167+
super(NewRelicMetricsReporter, self).__init__(*args, **kwargs)
168+
169+
# Register with data source for harvesting
170+
self.data_source = KafkaMetricsDataSource.singleton()
171+
self.data_source.register(self)
172+
173+
self._metrics = {}
174+
self._lock = threading.Lock()
175+
176+
def close(self, *args, **kwargs):
177+
self.data_source.unregister(self)
178+
with self._lock:
179+
self._metrics = {}
180+
181+
def init(self, metrics):
182+
for metric in metrics:
183+
self.metric_change(metric)
184+
185+
@staticmethod
186+
def invalid_metric_value(metric):
187+
name, value = metric
188+
return not any((math.isinf(value), math.isnan(value), value == 0))
189+
190+
def snapshot(self):
191+
with self._lock:
192+
# metric.value can only be called once, so care must be taken when filtering
193+
metrics = ((name, metric.value()) for name, metric in six.iteritems(self._metrics))
194+
return {
195+
"MessageBroker/Kafka/Internal/%s" % name: {"count": value}
196+
for name, value in filter(self.invalid_metric_value, metrics)
197+
}
198+
199+
def get_metric_name(self, metric):
200+
metric_name = metric.metric_name # Get MetricName object to work with
201+
202+
name = metric_name.name
203+
group = metric_name.group
204+
205+
if "topic" in metric_name.tags:
206+
topic = metric_name.tags["topic"]
207+
return "/".join((group, topic, name))
208+
else:
209+
return "/".join((group, name))
210+
211+
def metric_change(self, metric):
212+
name = self.get_metric_name(metric)
213+
with self._lock:
214+
self._metrics[name] = metric
215+
216+
def metric_removal(self, metric):
217+
name = self.get_metric_name(metric)
218+
with self._lock:
219+
if name in self._metrics:
220+
self._metrics.pop(name)
221+
222+
def configure(self, configs):
223+
return
224+
225+
226+
def wrap_KafkaProducerConsumer_init(wrapped, instance, args, kwargs):
227+
try:
228+
if "metric_reporters" in kwargs:
229+
metric_reporters = list(kwargs.get("metric_reporters", []))
230+
metric_reporters.append(NewRelicMetricsReporter)
231+
kwargs["metric_reporters"] = [metric_reporters]
232+
else:
233+
kwargs["metric_reporters"] = [NewRelicMetricsReporter]
234+
except Exception:
235+
pass
236+
237+
return wrapped(*args, **kwargs)
238+
239+
240+
def instrument_kafka_producer(module):
241+
if hasattr(module, "KafkaProducer"):
242+
wrap_function_wrapper(module, "KafkaProducer.__init__", wrap_KafkaProducerConsumer_init)
243+
244+
if hasattr(module, "KafkaProducer"):
245+
wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send)
246+
247+
248+
def instrument_kafka_consumer_group(module):
249+
if hasattr(module, "KafkaConsumer"):
250+
wrap_function_wrapper(module, "KafkaConsumer.__init__", wrap_KafkaProducerConsumer_init)

tests/messagebroker_kafkapython/conftest.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
import kafka
1919
import pytest
2020
from testing_support.db_settings import kafka_settings
21-
from testing_support.fixtures import ( # noqa: F401
21+
from testing_support.fixtures import ( # noqa: F401, W0611
2222
code_coverage_fixture,
2323
collector_agent_registration_fixture,
2424
collector_available_fixture,
2525
)
2626

2727
from newrelic.api.transaction import current_transaction
2828
from newrelic.common.object_wrapper import transient_function_wrapper
29+
from newrelic.hooks.messagebroker_kafkapython import KafkaMetricsDataSource
2930

3031
DB_SETTINGS = kafka_settings()[0]
3132

@@ -54,7 +55,7 @@
5455

5556

5657
@pytest.fixture(scope="function")
57-
def producer():
58+
def producer(data_source):
5859
producer = kafka.KafkaProducer(
5960
bootstrap_servers=BROKER, api_version=(2, 0, 2), value_serializer=lambda v: json.dumps(v).encode("utf-8")
6061
)
@@ -63,7 +64,7 @@ def producer():
6364

6465

6566
@pytest.fixture(scope="function")
66-
def consumer(topic):
67+
def consumer(topic, data_source):
6768
consumer = kafka.KafkaConsumer(
6869
topic,
6970
bootstrap_servers=BROKER,
@@ -82,6 +83,17 @@ def topic():
8283
yield "test-topic-%s" % str(uuid.uuid4())
8384

8485

86+
@pytest.fixture(scope="session")
87+
def data_source():
88+
"""
89+
Must be required by consumer and producer fixtures, or the first one of them to be
90+
instantiated will create and register the singleton. We rely on the singleton to
91+
not be registered to properly test the output of it without interference from the
92+
harvest thread.
93+
"""
94+
return KafkaMetricsDataSource.singleton(register=False)
95+
96+
8597
@transient_function_wrapper(kafka.producer.kafka, "KafkaProducer.send.__wrapped__")
8698
# Place transient wrapper underneath instrumentation
8799
def cache_kafka_headers(wrapped, instance, args, kwargs):
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def test_data_source_metrics(data_source, topic, producer, consumer):
17+
producer.send(topic, value=1)
18+
producer.flush()
19+
next(iter(consumer))
20+
21+
metrics = dict(data_source())
22+
metric_names = list(metrics.keys())
23+
assert metrics
24+
25+
26+
# Example metrics
27+
28+
# MessageBroker/Kafka/Internal/kafka-metrics-count/count
29+
# MessageBroker/Kafka/Internal/producer-metrics/connection-close-rate
30+
# MessageBroker/Kafka/Internal/producer-metrics/connection-creation-rate
31+
# MessageBroker/Kafka/Internal/producer-metrics/select-rate
32+
# MessageBroker/Kafka/Internal/producer-metrics/io-wait-time-ns-avg
33+
# MessageBroker/Kafka/Internal/producer-metrics/io-wait-ratio
34+
# MessageBroker/Kafka/Internal/producer-metrics/io-time-ns-avg
35+
# MessageBroker/Kafka/Internal/producer-metrics/io-ratio
36+
# MessageBroker/Kafka/Internal/producer-metrics/connection-count
37+
# MessageBroker/Kafka/Internal/producer-metrics/batch-size-avg
38+
# MessageBroker/Kafka/Internal/producer-metrics/batch-size-max
39+
# MessageBroker/Kafka/Internal/producer-metrics/compression-rate-avg
40+
# MessageBroker/Kafka/Internal/producer-metrics/record-queue-time-avg
41+
# MessageBroker/Kafka/Internal/producer-metrics/record-queue-time-max
42+
# MessageBroker/Kafka/Internal/producer-metrics/record-send-rate
43+
# MessageBroker/Kafka/Internal/producer-metrics/records-per-request-avg
44+
# MessageBroker/Kafka/Internal/producer-metrics/byte-rate
45+
# MessageBroker/Kafka/Internal/producer-metrics/record-size-max
46+
# MessageBroker/Kafka/Internal/producer-metrics/record-size-avg
47+
# MessageBroker/Kafka/Internal/producer-metrics/metadata-age
48+
# MessageBroker/Kafka/Internal/producer-metrics/network-io-rate
49+
# MessageBroker/Kafka/Internal/producer-metrics/outgoing-byte-rate
50+
# MessageBroker/Kafka/Internal/producer-metrics/request-rate
51+
# MessageBroker/Kafka/Internal/producer-metrics/request-size-avg
52+
# MessageBroker/Kafka/Internal/producer-metrics/request-size-max
53+
# MessageBroker/Kafka/Internal/producer-metrics/incoming-byte-rate
54+
# MessageBroker/Kafka/Internal/producer-metrics/response-rate
55+
# MessageBroker/Kafka/Internal/producer-metrics/request-latency-avg
56+
# MessageBroker/Kafka/Internal/producer-metrics/request-latency-max
57+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/outgoing-byte-rate
58+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-rate
59+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-size-avg
60+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-size-max
61+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/incoming-byte-rate
62+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/response-rate
63+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-latency-avg
64+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-latency-max
65+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/outgoing-byte-rate
66+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-rate
67+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-size-avg
68+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-size-max
69+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/incoming-byte-rate
70+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/response-rate
71+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-latency-avg
72+
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-latency-max
73+
# MessageBroker/Kafka/Internal/producer-topic-metrics.test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/record-send-rate
74+
# MessageBroker/Kafka/Internal/producer-topic-metrics.test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/byte-rate
75+
# MessageBroker/Kafka/Internal/producer-topic-metrics.test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/compression-rate
76+
# MessageBroker/Kafka/Internal/consumer-metrics/connection-close-rate
77+
# MessageBroker/Kafka/Internal/consumer-metrics/connection-creation-rate
78+
# MessageBroker/Kafka/Internal/consumer-metrics/select-rate
79+
# MessageBroker/Kafka/Internal/consumer-metrics/io-wait-time-ns-avg
80+
# MessageBroker/Kafka/Internal/consumer-metrics/io-wait-ratio
81+
# MessageBroker/Kafka/Internal/consumer-metrics/io-time-ns-avg
82+
# MessageBroker/Kafka/Internal/consumer-metrics/io-ratio
83+
# MessageBroker/Kafka/Internal/consumer-metrics/connection-count
84+
# MessageBroker/Kafka/Internal/consumer-metrics/network-io-rate
85+
# MessageBroker/Kafka/Internal/consumer-metrics/outgoing-byte-rate
86+
# MessageBroker/Kafka/Internal/consumer-metrics/request-rate
87+
# MessageBroker/Kafka/Internal/consumer-metrics/request-size-avg
88+
# MessageBroker/Kafka/Internal/consumer-metrics/request-size-max
89+
# MessageBroker/Kafka/Internal/consumer-metrics/incoming-byte-rate
90+
# MessageBroker/Kafka/Internal/consumer-metrics/response-rate
91+
# MessageBroker/Kafka/Internal/consumer-metrics/request-latency-avg
92+
# MessageBroker/Kafka/Internal/consumer-metrics/request-latency-max
93+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/outgoing-byte-rate
94+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-rate
95+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-size-avg
96+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-size-max
97+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/incoming-byte-rate
98+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/response-rate
99+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-latency-avg
100+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-latency-max
101+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-size-avg
102+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-size-max
103+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/bytes-consumed-rate
104+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-per-request-avg
105+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-consumed-rate
106+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-latency-avg
107+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-latency-max
108+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-rate
109+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-lag-max
110+
# MessageBroker/Kafka/Internal/consumer-coordinator-metrics/assigned-partitions
111+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/outgoing-byte-rate
112+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-rate
113+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-size-avg
114+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-size-max
115+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/incoming-byte-rate
116+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/response-rate
117+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-latency-avg
118+
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-latency-max
119+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/fetch-size-avg
120+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/fetch-size-max
121+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/bytes-consumed-rate
122+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/records-per-request-avg
123+
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/records-consumed-rate

0 commit comments

Comments
 (0)