Skip to content

Commit ef56ff7

Browse files
TimPansinohmstepaneklrafeeiumaannamalai
committed
Kafka Producer Instrumentation (#612)
* Basic kafka producer instrumentation. Co-authored-by: Lalleh Rafeei <[email protected]> * Add testing for kafka producer. Co-authored-by: Lalleh Rafeei <[email protected]> * Add producer test for notice_error & cleanup Co-authored-by: Lalleh Rafeei <[email protected]> * Fix py2 naming * Fix lint error * Kafka Heartbeat (#614) * Add Kafka test infra. * Update tests.yml. * Basic kafka producer instrumentation. Co-authored-by: Lalleh Rafeei <[email protected]> * Add testing for kafka producer. Co-authored-by: Lalleh Rafeei <[email protected]> * Kafka Heartbeat instrumentation * Remove changes not related to heartbeat Co-authored-by: Uma Annamalai <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]>
1 parent ec91e03 commit ef56ff7

File tree

3 files changed

+99
-12
lines changed

3 files changed

+99
-12
lines changed

newrelic/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,11 @@ def _process_module_builtin_defaults():
23072307
"instrument_cherrypy__cptree",
23082308
)
23092309

2310+
_process_module_definition(
2311+
"kafka.producer.kafka",
2312+
"newrelic.hooks.messagebroker_kafkapython",
2313+
"instrument_kafka_producer",
2314+
)
23102315
_process_module_definition(
23112316
"kafka.coordinator.heartbeat",
23122317
"newrelic.hooks.messagebroker_kafkapython",

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
# limitations under the License.
1414

1515
from newrelic.api.application import application_instance
16+
from newrelic.api.message_trace import MessageTrace
17+
from newrelic.api.time_trace import notice_error
18+
from newrelic.api.transaction import current_transaction
1619
from newrelic.common.object_wrapper import wrap_function_wrapper
1720

1821
HEARTBEAT_POLL = "MessageBroker/Kafka/Heartbeat/Poll"
@@ -23,6 +26,40 @@
2326
HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout"
2427

2528

29+
def _bind_send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
30+
return topic, value, key, headers, partition, timestamp_ms
31+
32+
33+
def wrap_KafkaProducer_send(wrapped, instance, args, kwargs):
34+
transaction = current_transaction()
35+
36+
if transaction is None:
37+
return wrapped(*args, **kwargs)
38+
39+
topic, value, key, headers, partition, timestamp_ms = _bind_send(*args, **kwargs)
40+
headers = list(headers) if headers else []
41+
42+
with MessageTrace(
43+
library="Kafka",
44+
operation="Produce",
45+
destination_type="Topic",
46+
destination_name=topic or "Default",
47+
source=wrapped,
48+
) as trace:
49+
dt_headers = [(k, v.encode("utf-8")) for k, v in trace.generate_request_headers(transaction)]
50+
headers.extend(dt_headers)
51+
try:
52+
return wrapped(topic, value=value, key=key, headers=headers, partition=partition, timestamp_ms=timestamp_ms)
53+
except Exception:
54+
notice_error()
55+
raise
56+
57+
58+
def instrument_kafka_producer(module):
59+
if hasattr(module, "KafkaProducer"):
60+
wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send)
61+
62+
2663
def metric_wrapper(metric_name, check_result=False):
2764
def _metric_wrapper(wrapped, instance, args, kwargs):
2865
result = wrapped(*args, **kwargs)

tests/messagebroker_kafkapython/test_kafka_produce.py

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,64 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import time
15+
import pytest
16+
from conftest import cache_kafka_headers
17+
from testing_support.fixtures import (
18+
validate_non_transaction_error_event,
19+
validate_transaction_metrics,
20+
)
21+
from testing_support.validators.validate_messagebroker_headers import (
22+
validate_messagebroker_headers,
23+
)
1624

25+
from newrelic.api.background_task import background_task
26+
from newrelic.packages import six
1727

18-
def test_no_harm(topic, producer, consumer):
19-
MESSAGES = [
20-
{"foo": "bar"},
21-
{"baz": "bat"},
22-
]
2328

24-
for msg in MESSAGES:
25-
time.sleep(1)
26-
producer.send(topic, value=msg)
27-
producer.flush()
29+
def test_producer_records_trace(topic, send_producer_messages):
30+
scoped_metrics = [("MessageBroker/Kafka/Topic/Produce/Named/%s" % topic, 3)]
31+
unscoped_metrics = scoped_metrics
32+
txn_name = "test_kafka_produce:test_producer_records_trace.<locals>.test" if six.PY3 else "test_kafka_produce:test"
2833

29-
for msg in consumer:
30-
assert msg.topic == topic
34+
@validate_transaction_metrics(
35+
txn_name,
36+
scoped_metrics=scoped_metrics,
37+
rollup_metrics=unscoped_metrics,
38+
background_task=True,
39+
)
40+
@background_task()
41+
@cache_kafka_headers
42+
@validate_messagebroker_headers
43+
def test():
44+
send_producer_messages()
45+
46+
test()
47+
48+
49+
def test_producer_records_error_if_raised(topic, producer):
50+
_intrinsic_attributes = {
51+
"error.class": "AssertionError",
52+
"error.message": "Need at least one: key or value",
53+
"error.expected": False,
54+
}
55+
56+
@validate_non_transaction_error_event(_intrinsic_attributes)
57+
@background_task()
58+
def test():
59+
producer.send(topic, None)
60+
producer.flush()
61+
62+
with pytest.raises(AssertionError):
63+
test()
64+
65+
66+
@pytest.fixture
67+
def send_producer_messages(topic, producer):
68+
def _test():
69+
messages = [1, 2, 3]
70+
for message in messages:
71+
producer.send(topic, message)
72+
73+
producer.flush()
74+
75+
return _test

0 commit comments

Comments
 (0)