Skip to content

Commit 75a8c6e

Browse files
hmstepanekTimPansinolrafeeiuannamalai
committed
Confluent Kafka Instrumentation (#620)
* Confluent-Kafka requires librdkafka-dev to compile Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> * Add confluent-kafka test env Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> * Add support for confluent-kafka Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> * Uncomment admin client in kafka-python fixture Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> * Run kafka tests serially * Attempt to fix flakey test failures Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]>
1 parent a32cc29 commit 75a8c6e

File tree

9 files changed

+950
-13
lines changed

9 files changed

+950
-13
lines changed

.github/workflows/tests.yml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,12 +435,12 @@ jobs:
435435

436436
kafka:
437437
env:
438-
TOTAL_GROUPS: 1
438+
TOTAL_GROUPS: 2
439439

440440
strategy:
441441
fail-fast: false
442442
matrix:
443-
group-number: [1]
443+
group-number: [1, 2]
444444

445445
runs-on: ubuntu-latest
446446
timeout-minutes: 30
@@ -472,6 +472,18 @@ jobs:
472472
- uses: actions/checkout@v3
473473
- uses: ./.github/actions/setup-python-matrix
474474

475+
# Special case packages
476+
- name: Install librdkafka-dev
477+
run: |
478+
# Use lsb-release to find the codename of Ubuntu to use to install the correct library name
479+
sudo apt-get update
480+
sudo ln -fs /usr/share/zoneinfo/America/Los_Angeles /etc/localtime
481+
sudo apt-get install -y wget gnupg2 software-properties-common
482+
sudo wget -qO - https://packages.confluent.io/deb/7.2/archive.key | sudo apt-key add -
483+
sudo add-apt-repository "deb https://packages.confluent.io/clients/deb $(lsb_release -cs) main"
484+
sudo apt-get update
485+
sudo apt-get install -y librdkafka-dev/$(lsb_release -c | cut -f 2)
486+
475487
- name: Get Environments
476488
id: get-envs
477489
run: |
@@ -481,7 +493,7 @@ jobs:
481493

482494
- name: Test
483495
run: |
484-
tox -vv -e ${{ steps.get-envs.outputs.envs }} -p auto
496+
tox -vv -e ${{ steps.get-envs.outputs.envs }}
485497
env:
486498
TOX_PARALLEL_NO_SPINNER: 1
487499
PY_COLORS: 0

newrelic/config.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,12 +2307,27 @@ def _process_module_builtin_defaults():
23072307
"instrument_cherrypy__cptree",
23082308
)
23092309

2310+
_process_module_definition(
2311+
"confluent_kafka.cimpl",
2312+
"newrelic.hooks.messagebroker_confluentkafka",
2313+
"instrument_confluentkafka_cimpl",
2314+
)
2315+
_process_module_definition(
2316+
"confluent_kafka.serializing_producer",
2317+
"newrelic.hooks.messagebroker_confluentkafka",
2318+
"instrument_confluentkafka_serializing_producer",
2319+
)
2320+
_process_module_definition(
2321+
"confluent_kafka.deserializing_consumer",
2322+
"newrelic.hooks.messagebroker_confluentkafka",
2323+
"instrument_confluentkafka_deserializing_consumer",
2324+
)
2325+
23102326
_process_module_definition(
23112327
"kafka.consumer.group",
23122328
"newrelic.hooks.messagebroker_kafkapython",
23132329
"instrument_kafka_consumer_group",
23142330
)
2315-
23162331
_process_module_definition(
23172332
"kafka.producer.kafka",
23182333
"newrelic.hooks.messagebroker_kafkapython",
@@ -2323,7 +2338,6 @@ def _process_module_builtin_defaults():
23232338
"newrelic.hooks.messagebroker_kafkapython",
23242339
"instrument_kafka_heartbeat",
23252340
)
2326-
23272341
_process_module_definition(
23282342
"kafka.consumer.group",
23292343
"newrelic.hooks.messagebroker_kafkapython",
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import logging
15+
import sys
16+
17+
from newrelic.api.application import application_instance
18+
from newrelic.api.error_trace import wrap_error_trace
19+
from newrelic.api.function_trace import FunctionTraceWrapper
20+
from newrelic.api.message_trace import MessageTrace
21+
from newrelic.api.message_transaction import MessageTransaction
22+
from newrelic.api.time_trace import notice_error
23+
from newrelic.api.transaction import current_transaction
24+
from newrelic.common.object_wrapper import function_wrapper, wrap_function_wrapper
25+
26+
_logger = logging.getLogger(__name__)
27+
28+
HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
29+
HEARTBEAT_SENT = "MessageBroker/Kafka/Heartbeat/Sent"
30+
HEARTBEAT_FAIL = "MessageBroker/Kafka/Heartbeat/Fail"
31+
HEARTBEAT_RECEIVE = "MessageBroker/Kafka/Heartbeat/Receive"
32+
HEARTBEAT_SESSION_TIMEOUT = "MessageBroker/Kafka/Heartbeat/SessionTimeout"
33+
HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout"
34+
35+
36+
def _bind_Producer_produce(topic, value=None, key=None, partition=-1, on_delivery=None, timestamp=0, headers=None):
37+
return topic, value, key, partition, on_delivery, timestamp, headers
38+
39+
40+
def wrap_Producer_produce(wrapped, instance, args, kwargs):
41+
transaction = current_transaction()
42+
if transaction is None:
43+
return wrapped(*args, **kwargs)
44+
45+
topic, value, key, partition, on_delivery, timestamp, headers = _bind_Producer_produce(*args, **kwargs)
46+
headers = list(headers) if headers else []
47+
48+
with MessageTrace(
49+
library="Kafka",
50+
operation="Produce",
51+
destination_type="Topic",
52+
destination_name=topic or "Default",
53+
source=wrapped,
54+
) as trace:
55+
dt_headers = [(k, v.encode("utf-8")) for k, v in trace.generate_request_headers(transaction)]
56+
headers.extend(dt_headers)
57+
try:
58+
return wrapped(
59+
topic,
60+
value=value,
61+
key=key,
62+
partition=partition,
63+
on_delivery=on_delivery,
64+
timestamp=timestamp,
65+
headers=headers,
66+
)
67+
except Exception as error:
68+
# Unwrap kafka errors
69+
while hasattr(error, "exception"):
70+
error = error.exception # pylint: disable=E1101
71+
72+
_, _, tb = sys.exc_info()
73+
notice_error((type(error), error, tb))
74+
tb = None # Clear reference to prevent reference cycles
75+
raise
76+
77+
78+
def wrap_Consumer_poll(wrapped, instance, args, kwargs):
79+
# This wrapper can be called either outside of a transaction, or
80+
# within the context of an existing transaction. There are 4
81+
# possibilities we need to handle: (Note that this is similar to
82+
# our Pika, Celery, and Kafka-Python instrumentation)
83+
#
84+
# 1. Inside an inner wrapper in the DeserializingConsumer
85+
#
86+
# Do nothing. The DeserializingConsumer is double wrapped because
87+
# the underlying C implementation is wrapped as well. We need to
88+
# detect when the second wrapper is called and ignore it completely
89+
# or transactions will be stopped early.
90+
#
91+
# 2. In an inactive transaction
92+
#
93+
# If the end_of_transaction() or ignore_transaction() API
94+
# calls have been invoked, this iterator may be called in the
95+
# context of an inactive transaction. In this case, don't wrap
96+
# the iterator in any way. Just run the original iterator.
97+
#
98+
# 3. In an active transaction
99+
#
100+
# Do nothing.
101+
#
102+
# 4. Outside of a transaction
103+
#
104+
# Since it's not running inside of an existing transaction, we
105+
# want to create a new background transaction for it.
106+
107+
# Step 1: Stop existing transactions
108+
if hasattr(instance, "_nr_transaction") and not instance._nr_transaction.stopped:
109+
instance._nr_transaction.__exit__(*sys.exc_info())
110+
111+
# Step 2: Poll for records
112+
try:
113+
record = wrapped(*args, **kwargs)
114+
except Exception as e:
115+
if current_transaction():
116+
notice_error()
117+
else:
118+
notice_error(application=application_instance(activate=False))
119+
raise
120+
121+
# Step 3: Start new transaction for received record
122+
if record:
123+
library = "Kafka"
124+
destination_type = "Topic"
125+
destination_name = record.topic()
126+
received_bytes = len(str(record.value()).encode("utf-8"))
127+
message_count = 1
128+
129+
headers = record.headers()
130+
headers = dict(headers) if headers else {}
131+
132+
transaction = current_transaction(active_only=False)
133+
if not transaction:
134+
transaction = MessageTransaction(
135+
application=application_instance(),
136+
library=library,
137+
destination_type=destination_type,
138+
destination_name=destination_name,
139+
headers=headers,
140+
transport_type="Kafka",
141+
routing_key=record.key(),
142+
source=wrapped,
143+
)
144+
instance._nr_transaction = transaction
145+
transaction.__enter__() # pylint: disable=C2801
146+
147+
transaction._add_agent_attribute("kafka.consume.byteCount", received_bytes)
148+
149+
transaction = current_transaction()
150+
151+
if transaction: # If there is an active transaction now.
152+
# Add metrics whether or not a transaction was already active, or one was just started.
153+
# Don't add metrics if there was an inactive transaction.
154+
# Name the metrics using the same format as the transaction, but in case the active transaction
155+
# was an existing one and not a message transaction, reproduce the naming logic here.
156+
group = "Message/%s/%s" % (library, destination_type)
157+
name = "Named/%s" % destination_name
158+
transaction.record_custom_metric("%s/%s/Received/Bytes" % (group, name), received_bytes)
159+
transaction.record_custom_metric("%s/%s/Received/Messages" % (group, name), message_count)
160+
161+
return record
162+
163+
164+
def wrap_DeserializingConsumer_poll(wrapped, instance, args, kwargs):
165+
try:
166+
return wrapped(*args, **kwargs)
167+
except Exception:
168+
notice_error()
169+
170+
# Stop existing transactions
171+
if hasattr(instance, "_nr_transaction") and not instance._nr_transaction.stopped:
172+
instance._nr_transaction.__exit__(*sys.exc_info())
173+
174+
raise
175+
176+
177+
def wrap_serializer(serializer_name, group_prefix):
178+
@function_wrapper
179+
def _wrap_serializer(wrapped, instance, args, kwargs):
180+
if not current_transaction():
181+
return wrapped(*args, **kwargs)
182+
183+
topic = args[1].topic
184+
group = "%s/Kafka/Topic" % group_prefix
185+
name = "Named/%s/%s" % (topic, serializer_name)
186+
187+
return FunctionTraceWrapper(wrapped, name=name, group=group)(*args, **kwargs)
188+
189+
return _wrap_serializer
190+
191+
192+
def wrap_SerializingProducer_init(wrapped, instance, args, kwargs):
193+
wrapped(*args, **kwargs)
194+
195+
if hasattr(instance, "_key_serializer") and callable(instance._key_serializer):
196+
instance._key_serializer = wrap_serializer("Serialization/Key", "MessageBroker")(instance._key_serializer)
197+
198+
if hasattr(instance, "_value_serializer") and callable(instance._value_serializer):
199+
instance._value_serializer = wrap_serializer("Serialization/Value", "MessageBroker")(instance._value_serializer)
200+
201+
202+
def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs):
203+
wrapped(*args, **kwargs)
204+
205+
if hasattr(instance, "_key_deserializer") and callable(instance._key_deserializer):
206+
instance._key_deserializer = wrap_serializer("Deserialization/Key", "Message")(instance._key_deserializer)
207+
208+
if hasattr(instance, "_value_deserializer") and callable(instance._value_deserializer):
209+
instance._value_deserializer = wrap_serializer("Deserialization/Value", "Message")(instance._value_deserializer)
210+
211+
212+
def wrap_immutable_class(module, class_name):
213+
# Wrap immutable binary extension class with a mutable Python subclass
214+
new_class = type(class_name, (getattr(module, class_name),), {})
215+
setattr(module, class_name, new_class)
216+
return new_class
217+
218+
219+
def instrument_confluentkafka_cimpl(module):
220+
if hasattr(module, "Producer"):
221+
wrap_immutable_class(module, "Producer")
222+
wrap_function_wrapper(module, "Producer.produce", wrap_Producer_produce)
223+
224+
if hasattr(module, "Consumer"):
225+
wrap_immutable_class(module, "Consumer")
226+
wrap_function_wrapper(module, "Consumer.poll", wrap_Consumer_poll)
227+
228+
229+
def instrument_confluentkafka_serializing_producer(module):
230+
if hasattr(module, "SerializingProducer"):
231+
wrap_function_wrapper(module, "SerializingProducer.__init__", wrap_SerializingProducer_init)
232+
wrap_error_trace(module, "SerializingProducer.produce")
233+
234+
235+
def instrument_confluentkafka_deserializing_consumer(module):
236+
if hasattr(module, "DeserializingConsumer"):
237+
wrap_function_wrapper(module, "DeserializingConsumer.__init__", wrap_DeserializingConsumer_init)
238+
wrap_function_wrapper(module, "DeserializingConsumer.poll", wrap_DeserializingConsumer_poll)

0 commit comments

Comments
 (0)