Skip to content

Commit b9bca3d

Browse files
Add usage tracking metrics for Kafka clients. (#658)
* Add usage tracking metrics for Kafka clients. * Fix double import lint error * [Mega-Linter] Apply linters fixes * Create version util file and add metrics to consumer. * Address linting errors. * Add missing semi-colon. * [Mega-Linter] Apply linters fixes * Bump tests. Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: hmstepanek <[email protected]> Co-authored-by: umaannamalai <[email protected]>
1 parent 65246e7 commit b9bca3d

File tree

9 files changed

+60
-18
lines changed

9 files changed

+60
-18
lines changed

newrelic/api/transaction.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ def __init__(self, application, enabled=None, source=None):
186186
self._loop_time = 0.0
187187

188188
self._frameworks = set()
189+
self._message_brokers = set()
189190

190191
self._frozen_path = None
191192

@@ -545,6 +546,10 @@ def __exit__(self, exc, value, tb):
545546
for framework, version in self._frameworks:
546547
self.record_custom_metric("Python/Framework/%s/%s" % (framework, version), 1)
547548

549+
if self._message_brokers:
550+
for message_broker, version in self._message_brokers:
551+
self.record_custom_metric("Python/MessageBroker/%s/%s" % (message_broker, version), 1)
552+
548553
if self._settings.distributed_tracing.enabled:
549554
# Sampled and priority need to be computed at the end of the
550555
# transaction when distributed tracing or span events are enabled.
@@ -1676,6 +1681,10 @@ def add_framework_info(self, name, version=None):
16761681
if name:
16771682
self._frameworks.add((name, version))
16781683

1684+
def add_messagebroker_info(self, name, version=None):
1685+
if name:
1686+
self._message_brokers.add((name, version))
1687+
16791688
def dump(self, file):
16801689
"""Dumps details about the transaction to the file object."""
16811690

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sys
16+
17+
18+
def get_package_version(name):
19+
# importlib was introduced into the standard library starting in Python3.8.
20+
if "importlib" in sys.modules and hasattr(sys.modules["importlib"], "metadata"):
21+
return sys.modules["importlib"].metadata.version(name) # pylint: disable=E1101
22+
elif "pkg_resources" in sys.modules:
23+
return sys.modules["pkg_resources"].get_distribution(name).version

newrelic/core/environment.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import sysconfig
2424

2525
import newrelic
26+
from newrelic.common.package_version_utils import get_package_version
2627
from newrelic.common.system_info import (
2728
logical_processor_count,
2829
physical_processor_count,
@@ -37,18 +38,6 @@
3738

3839
def environment_settings():
3940
"""Returns an array of arrays of environment settings"""
40-
41-
# Find version resolver.
42-
43-
get_version = None
44-
# importlib was introduced into the standard library starting in Python3.8.
45-
if "importlib" in sys.modules and hasattr(sys.modules["importlib"], "metadata"):
46-
get_version = sys.modules["importlib"].metadata.version
47-
elif "pkg_resources" in sys.modules:
48-
49-
def get_version(name): # pylint: disable=function-redefined
50-
return sys.modules["pkg_resources"].get_distribution(name).version
51-
5241
env = []
5342

5443
# Agent information.
@@ -186,7 +175,7 @@ def get_version(name): # pylint: disable=function-redefined
186175
dispatcher.append(("Dispatcher Version", hypercorn.__version__))
187176
else:
188177
try:
189-
dispatcher.append(("Dispatcher Version", get_version("hypercorn")))
178+
dispatcher.append(("Dispatcher Version", get_package_version("hypercorn")))
190179
except Exception:
191180
pass
192181

@@ -237,7 +226,7 @@ def get_version(name): # pylint: disable=function-redefined
237226
continue
238227

239228
try:
240-
version = get_version(name)
229+
version = get_package_version(name)
241230
plugins.append("%s (%s)" % (name, version))
242231
except Exception:
243232
plugins.append(name)

newrelic/hooks/messagebroker_confluentkafka.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from newrelic.api.time_trace import notice_error
2323
from newrelic.api.transaction import current_transaction
2424
from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper
25+
from newrelic.common.package_version_utils import get_package_version
2526

2627
_logger = logging.getLogger(__name__)
2728

@@ -56,6 +57,8 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
5657
else:
5758
topic = kwargs.get("topic", None)
5859

60+
transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))
61+
5962
with MessageTrace(
6063
library="Kafka",
6164
operation="Produce",
@@ -161,6 +164,7 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs):
161164
name = "Named/%s" % destination_name
162165
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
163166
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
167+
transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))
164168

165169
return record
166170

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
function_wrapper,
2727
wrap_function_wrapper,
2828
)
29+
from newrelic.common.package_version_utils import get_package_version
2930

3031
HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
3132
HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
@@ -48,6 +49,8 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
4849
topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
4950
headers = list(headers) if headers else []
5051

52+
transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))
53+
5154
with MessageTrace(
5255
library="Kafka",
5356
operation="Produce",
@@ -112,6 +115,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
112115
message_count = 1
113116

114117
transaction = current_transaction(active_only=False)
118+
115119
if not transaction:
116120
transaction = MessageTransaction(
117121
application=application_instance(),
@@ -124,7 +128,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
124128
source=wrapped,
125129
)
126130
instance._nr_transaction = transaction
127-
transaction.__enter__()
131+
transaction.__enter__() # pylint: disable=C2801
128132

129133
# Obtain consumer client_id to send up as agent attribute
130134
if hasattr(instance, "config") and "client_id" in instance.config:
@@ -143,12 +147,13 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
143147
name = "Named/%s" % destination_name
144148
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
145149
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
150+
transaction.add_messagebroker_info("Kafka-Python", get_package_version("kafka-python"))
146151

147152
return record
148153

149154

150155
def wrap_KafkaProducer_init(wrapped, instance, args, kwargs):
151-
get_config_key = lambda key: kwargs.get(key, instance.DEFAULT_CONFIG[key]) # noqa: E731
156+
get_config_key = lambda key: kwargs.get(key, instance.DEFAULT_CONFIG[key]) # pylint: disable=C3001 # noqa: E731
152157

153158
kwargs["key_serializer"] = wrap_serializer(
154159
instance, "Serialization/Key", "MessageBroker", get_config_key("key_serializer")
@@ -162,13 +167,13 @@ def wrap_KafkaProducer_init(wrapped, instance, args, kwargs):
162167

163168
class NewRelicSerializerWrapper(ObjectProxy):
164169
def __init__(self, wrapped, serializer_name, group_prefix):
165-
ObjectProxy.__init__.__get__(self)(wrapped)
170+
ObjectProxy.__init__.__get__(self)(wrapped) # pylint: disable=W0231
166171

167172
self._nr_serializer_name = serializer_name
168173
self._nr_group_prefix = group_prefix
169174

170175
def serialize(self, topic, object):
171-
wrapped = self.__wrapped__.serialize
176+
wrapped = self.__wrapped__.serialize # pylint: disable=W0622
172177
args = (topic, object)
173178
kwargs = {}
174179

tests/messagebroker_confluentkafka/test_consumer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def _test():
6363

6464

6565
def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
66+
from confluent_kafka import __version__ as version
67+
6668
transaction_name = (
6769
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
6870
)
@@ -72,6 +74,7 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
7274
custom_metrics=[
7375
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
7476
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
77+
("Python/MessageBroker/Confluent-Kafka/%s" % version, 1),
7578
],
7679
background_task=True,
7780
)

tests/messagebroker_confluentkafka/test_producer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ def producer_callback(err, msg):
6565

6666

6767
def test_trace_metrics(topic, send_producer_message):
68+
from confluent_kafka import __version__ as version
69+
6870
scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
6971
unscoped_metrics = scoped_metrics
7072
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
@@ -73,6 +75,7 @@ def test_trace_metrics(topic, send_producer_message):
7375
txn_name,
7476
scoped_metrics=scoped_metrics,
7577
rollup_metrics=unscoped_metrics,
78+
custom_metrics=[("Python/MessageBroker/Confluent-Kafka/%s" % version, 1)],
7679
background_task=True,
7780
)
7881
@background_task()

tests/messagebroker_kafkapython/test_consumer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ def _test():
6060

6161

6262
def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
63+
from kafka.version import __version__ as version
64+
6365
transaction_name = (
6466
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
6567
)
@@ -69,6 +71,7 @@ def test_custom_metrics_on_existing_transaction(get_consumer_record, topic):
6971
custom_metrics=[
7072
("Message/Kafka/Topic/Named/%s/Received/Bytes" % topic, 1),
7173
("Message/Kafka/Topic/Named/%s/Received/Messages" % topic, 1),
74+
("Python/MessageBroker/Kafka-Python/%s" % version, 1),
7275
],
7376
background_task=True,
7477
)

tests/messagebroker_kafkapython/test_producer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929

3030
def test_trace_metrics(topic, send_producer_message):
31+
from kafka.version import __version__ as version
32+
3133
scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 1)]
3234
unscoped_metrics = scoped_metrics
3335
txn_name = "test_producer:test_trace_metrics.<locals>.test" if six.PY3 else "test_producer:test"
@@ -36,6 +38,7 @@ def test_trace_metrics(topic, send_producer_message):
3638
txn_name,
3739
scoped_metrics=scoped_metrics,
3840
rollup_metrics=unscoped_metrics,
41+
custom_metrics=[("Python/MessageBroker/Kafka-Python/%s" % version, 1)],
3942
background_task=True,
4043
)
4144
@background_task()

0 commit comments

Comments
 (0)