Skip to content

Commit 411b980

Browse files
hmstepanekumaannamalaiTimPansinolrafeei
committed
Add kafka python consumer (#611)
* Add kafka-python consumer __next__ instrumentation Co-authored-by: UmaAnnamalai <[email protected]> Co-authored-by: TimPansino <[email protected]> Co-authored-by: LallehRafeei <[email protected]> * Replace iter(next()) with for loop The consumer instrumentation of __next__ relies on __next__ getting called until StopIteration is raised. If this does not happen, the current transaction is not ended. Because the heartbeat tests were not calling __next__ a second time to end the current transaction, this caused failures in the consumer tests where the previous transaction was not getting closed. Because the newrelic application is per session rather than per function, the unclosed transaction inside the heartbeat tests were impacting the consumer tests which ran right after. Co-authored-by: LallehRafeei <[email protected]> * Remove duplicated module has attr Co-authored-by: LallehRafeei <[email protected]> Co-authored-by: UmaAnnamalai <[email protected]> Co-authored-by: TimPansino <[email protected]> Co-authored-by: LallehRafeei <[email protected]>
1 parent 3b89bbf commit 411b980

File tree

5 files changed

+326
-55
lines changed

5 files changed

+326
-55
lines changed

newrelic/api/message_transaction.py

Lines changed: 103 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,33 +19,39 @@
1919
from newrelic.api.background_task import BackgroundTask
2020
from newrelic.api.message_trace import MessageTrace
2121
from newrelic.api.transaction import current_transaction
22+
from newrelic.common.async_proxy import TransactionContext, async_proxy
2223
from newrelic.common.object_wrapper import FunctionWrapper, wrap_object
23-
from newrelic.common.async_proxy import async_proxy, TransactionContext
2424

2525

2626
class MessageTransaction(BackgroundTask):
27-
28-
def __init__(self, library, destination_type,
29-
destination_name, application, routing_key=None,
30-
exchange_type=None, headers=None, queue_name=None, reply_to=None,
31-
correlation_id=None, source=None):
32-
33-
name, group = self.get_transaction_name(library, destination_type,
34-
destination_name)
35-
36-
super(MessageTransaction, self).__init__(application, name,
37-
group=group, source=source)
27+
def __init__(
28+
self,
29+
library,
30+
destination_type,
31+
destination_name,
32+
application,
33+
routing_key=None,
34+
exchange_type=None,
35+
headers=None,
36+
queue_name=None,
37+
reply_to=None,
38+
correlation_id=None,
39+
transport_type="AMQP",
40+
source=None,
41+
):
42+
43+
name, group = self.get_transaction_name(library, destination_type, destination_name)
44+
45+
super(MessageTransaction, self).__init__(application, name, group=group, source=source)
3846

3947
self.headers = headers
4048

4149
if headers is not None and self.settings is not None:
4250
if self.settings.distributed_tracing.enabled:
43-
self.accept_distributed_trace_headers(
44-
headers, transport_type='AMQP')
51+
self.accept_distributed_trace_headers(headers, transport_type=transport_type)
4552
elif self.settings.cross_application_tracer.enabled:
4653
self._process_incoming_cat_headers(
47-
headers.pop(MessageTrace.cat_id_key, None),
48-
headers.pop(MessageTrace.cat_transaction_key, None)
54+
headers.pop(MessageTrace.cat_id_key, None), headers.pop(MessageTrace.cat_transaction_key, None)
4955
)
5056

5157
self.routing_key = routing_key
@@ -56,37 +62,45 @@ def __init__(self, library, destination_type,
5662

5763
@staticmethod
5864
def get_transaction_name(library, destination_type, destination_name):
59-
group = 'Message/%s/%s' % (library, destination_type)
60-
name = 'Named/%s' % destination_name
65+
group = "Message/%s/%s" % (library, destination_type)
66+
name = "Named/%s" % destination_name
6167
return name, group
6268

6369
def _update_agent_attributes(self):
6470
ms_attrs = self._agent_attributes
6571

6672
if self.exchange_type is not None:
67-
ms_attrs['message.exchangeType'] = self.exchange_type
73+
ms_attrs["message.exchangeType"] = self.exchange_type
6874
if self.queue_name is not None:
69-
ms_attrs['message.queueName'] = self.queue_name
75+
ms_attrs["message.queueName"] = self.queue_name
7076
if self.reply_to is not None:
71-
ms_attrs['message.replyTo'] = self.reply_to
77+
ms_attrs["message.replyTo"] = self.reply_to
7278
if self.correlation_id is not None:
73-
ms_attrs['message.correlationId'] = self.correlation_id
79+
ms_attrs["message.correlationId"] = self.correlation_id
7480
if self.headers:
7581
for k, v in self.headers.items():
76-
new_key = 'message.headers.%s' % k
82+
new_key = "message.headers.%s" % k
7783
new_val = str(v)
7884
ms_attrs[new_key] = new_val
7985
if self.routing_key is not None:
80-
ms_attrs['message.routingKey'] = self.routing_key
86+
ms_attrs["message.routingKey"] = self.routing_key
8187

8288
super(MessageTransaction, self)._update_agent_attributes()
8389

8490

85-
def MessageTransactionWrapper(wrapped, library, destination_type,
86-
destination_name, application=None, routing_key=None,
87-
exchange_type=None, headers=None, queue_name=None, reply_to=None,
88-
correlation_id=None):
89-
91+
def MessageTransactionWrapper(
92+
wrapped,
93+
library,
94+
destination_type,
95+
destination_name,
96+
application=None,
97+
routing_key=None,
98+
exchange_type=None,
99+
headers=None,
100+
queue_name=None,
101+
reply_to=None,
102+
correlation_id=None,
103+
):
90104
def wrapper(wrapped, instance, args, kwargs):
91105
if callable(library):
92106
if instance is not None:
@@ -173,9 +187,8 @@ def create_transaction(transaction):
173187
if not transaction.background_task:
174188
transaction.background_task = True
175189
transaction.set_transaction_name(
176-
*MessageTransaction.get_transaction_name(
177-
_library, _destination_type,
178-
_destination_name))
190+
*MessageTransaction.get_transaction_name(_library, _destination_type, _destination_name)
191+
)
179192

180193
return None
181194

@@ -233,22 +246,61 @@ def create_transaction(transaction):
233246
return FunctionWrapper(wrapped, wrapper)
234247

235248

236-
def message_transaction(library, destination_type, destination_name,
237-
application=None, routing_key=None, exchange_type=None, headers=None,
238-
queue_name=None, reply_to=None, correlation_id=None):
239-
return functools.partial(MessageTransactionWrapper,
240-
library=library, destination_type=destination_type,
241-
destination_name=destination_name, application=application,
242-
routing_key=routing_key, exchange_type=exchange_type,
243-
headers=headers, queue_name=queue_name, reply_to=reply_to,
244-
correlation_id=correlation_id)
245-
246-
247-
def wrap_message_transaction(module, object_path, library, destination_type,
248-
destination_name, application=None, routing_key=None,
249-
exchange_type=None, headers=None, queue_name=None, reply_to=None,
250-
correlation_id=None):
251-
wrap_object(module, object_path, MessageTransactionWrapper,
252-
(library, destination_type, destination_name, application,
253-
routing_key, exchange_type, headers, queue_name, reply_to,
254-
correlation_id))
249+
def message_transaction(
250+
library,
251+
destination_type,
252+
destination_name,
253+
application=None,
254+
routing_key=None,
255+
exchange_type=None,
256+
headers=None,
257+
queue_name=None,
258+
reply_to=None,
259+
correlation_id=None,
260+
):
261+
return functools.partial(
262+
MessageTransactionWrapper,
263+
library=library,
264+
destination_type=destination_type,
265+
destination_name=destination_name,
266+
application=application,
267+
routing_key=routing_key,
268+
exchange_type=exchange_type,
269+
headers=headers,
270+
queue_name=queue_name,
271+
reply_to=reply_to,
272+
correlation_id=correlation_id,
273+
)
274+
275+
276+
def wrap_message_transaction(
277+
module,
278+
object_path,
279+
library,
280+
destination_type,
281+
destination_name,
282+
application=None,
283+
routing_key=None,
284+
exchange_type=None,
285+
headers=None,
286+
queue_name=None,
287+
reply_to=None,
288+
correlation_id=None,
289+
):
290+
wrap_object(
291+
module,
292+
object_path,
293+
MessageTransactionWrapper,
294+
(
295+
library,
296+
destination_type,
297+
destination_name,
298+
application,
299+
routing_key,
300+
exchange_type,
301+
headers,
302+
queue_name,
303+
reply_to,
304+
correlation_id,
305+
),
306+
)

newrelic/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,12 @@ def _process_module_builtin_defaults():
23072307
"instrument_cherrypy__cptree",
23082308
)
23092309

2310+
_process_module_definition(
2311+
"kafka.consumer.group",
2312+
"newrelic.hooks.messagebroker_kafkapython",
2313+
"instrument_kafka_consumer_group",
2314+
)
2315+
23102316
_process_module_definition(
23112317
"kafka.producer.kafka",
23122318
"newrelic.hooks.messagebroker_kafkapython",

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 80 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
1514
import logging
1615
import math
16+
import sys
1717
import threading
1818

1919
from kafka.metrics.metrics_reporter import AbstractMetricsReporter
2020

2121
import newrelic.core.agent
2222
from newrelic.api.application import application_instance
2323
from newrelic.api.message_trace import MessageTrace
24+
from newrelic.api.message_transaction import MessageTransaction
2425
from newrelic.api.time_trace import notice_error
2526
from newrelic.api.transaction import current_transaction
2627
from newrelic.common.object_wrapper import wrap_function_wrapper
@@ -109,6 +110,83 @@ def instrument_kafka_heartbeat(module):
109110
)
110111

111112

113+
def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
114+
if hasattr(instance, "_nr_transaction") and not instance._nr_transaction.stopped:
115+
instance._nr_transaction.__exit__(*sys.exc_info())
116+
117+
try:
118+
record = wrapped(*args, **kwargs)
119+
except Exception as e:
120+
# StopIteration is an expected error, indicating the end of an iterable,
121+
# that should not be captured.
122+
if not isinstance(e, StopIteration):
123+
notice_error()
124+
raise
125+
126+
if record:
127+
# This iterator can be called either outside of a transaction, or
128+
# within the context of an existing transaction. There are 3
129+
# possibilities we need to handle: (Note that this is similar to
130+
# our Pika and Celery instrumentation)
131+
#
132+
# 1. In an inactive transaction
133+
#
134+
# If the end_of_transaction() or ignore_transaction() API
135+
# calls have been invoked, this iterator may be called in the
136+
# context of an inactive transaction. In this case, don't wrap
137+
# the iterator in any way. Just run the original iterator.
138+
#
139+
# 2. In an active transaction
140+
#
141+
# Do nothing.
142+
#
143+
# 3. Outside of a transaction
144+
#
145+
# Since it's not running inside of an existing transaction, we
146+
# want to create a new background transaction for it.
147+
148+
library = "Kafka"
149+
destination_type = "Topic"
150+
destination_name = record.topic
151+
received_bytes = len(str(record.value).encode("utf-8"))
152+
message_count = 1
153+
154+
transaction = current_transaction(active_only=False)
155+
if not transaction:
156+
transaction = MessageTransaction(
157+
application=application_instance(),
158+
library=library,
159+
destination_type=destination_type,
160+
destination_name=destination_name,
161+
headers=record.headers,
162+
transport_type="Kafka",
163+
routing_key=record.key,
164+
source=wrapped,
165+
)
166+
instance._nr_transaction = transaction
167+
transaction.__enter__()
168+
169+
# Obtain consumer client_id to send up as agent attribute
170+
if hasattr(instance, "config") and "client_id" in instance.config:
171+
client_id = instance.config["client_id"]
172+
transaction._add_agent_attribute("kafka.consume.client_id", client_id)
173+
174+
transaction._add_agent_attribute("kafka.consume.byteCount", received_bytes)
175+
176+
transaction = current_transaction()
177+
if transaction: # If there is an active transaction now.
178+
# Add metrics whether or not a transaction was already active, or one was just started.
179+
# Don't add metrics if there was an inactive transaction.
180+
# Name the metrics using the same format as the transaction, but in case the active transaction
181+
# was an existing one and not a message transaction, reproduce the naming logic here.
182+
group = "Message/%s/%s" % (library, destination_type)
183+
name = "Named/%s" % destination_name
184+
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
185+
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
186+
187+
return record
188+
189+
112190
class KafkaMetricsDataSource(object):
113191
_instance = None
114192

@@ -240,11 +318,10 @@ def wrap_KafkaProducerConsumer_init(wrapped, instance, args, kwargs):
240318
def instrument_kafka_producer(module):
241319
if hasattr(module, "KafkaProducer"):
242320
wrap_function_wrapper(module, "KafkaProducer.__init__", wrap_KafkaProducerConsumer_init)
243-
244-
if hasattr(module, "KafkaProducer"):
245321
wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send)
246322

247323

248324
def instrument_kafka_consumer_group(module):
249325
if hasattr(module, "KafkaConsumer"):
250326
wrap_function_wrapper(module, "KafkaConsumer.__init__", wrap_KafkaProducerConsumer_init)
327+
wrap_function_wrapper(module.KafkaConsumer, "__next__", wrap_kafkaconsumer_next)

tests/messagebroker_kafkapython/test_heartbeat.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ def test_successful_heartbeat_metrics_recorded(topic, producer, consumer):
3434
producer.send(topic, value=1)
3535
producer.flush()
3636

37-
next(iter(consumer))
37+
for record in consumer:
38+
pass
3839
time.sleep(1.5)
3940

4041

0 commit comments

Comments
 (0)