Skip to content

Commit 6d71a66

Browse files
lrafeeiTimPansinohmstepanekumaannamalai
committed
Add more metrics, consumer, and producer tests (#619)
* Add more metrics, consumer, and producer tests Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> * Remove breakpoint * Add DT accepted validator * Fix lint error * Fix issue with holdover transaction Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> * [Mega-Linter] Apply linters fixes * Fix broken producer error test * [Mega-Linter] Apply linters fixes * Bump Tests * Fix flakey test * Fix flakey test * fixup * Fix metrics tests for Python 2.7 Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: Uma Annamalai <[email protected]> Co-authored-by: Timothy Pansino <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: Tim Pansino <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> Co-authored-by: Lalleh Rafeei <[email protected]> Co-authored-by: Uma Annamalai <[email protected]>
1 parent 411b980 commit 6d71a66

File tree

7 files changed

+208
-139
lines changed

7 files changed

+208
-139
lines changed

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,15 +447,15 @@ jobs:
447447

448448
services:
449449
zookeeper:
450-
image: bitnami/zookeeper:latest
450+
image: bitnami/zookeeper:3.7
451451
env:
452452
ALLOW_ANONYMOUS_LOGIN: yes
453453

454454
ports:
455455
- 2181:2181
456456

457457
kafka:
458-
image: bitnami/kafka:latest
458+
image: bitnami/kafka:3.2
459459
ports:
460460
- 8080:8080
461461
- 8081:8081

newrelic/hooks/messagebroker_kafkapython.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs):
158158
library=library,
159159
destination_type=destination_type,
160160
destination_name=destination_name,
161-
headers=record.headers,
161+
headers=dict(record.headers),
162162
transport_type="Kafka",
163163
routing_key=record.key,
164164
source=wrapped,

tests/messagebroker_kafkapython/conftest.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555

5656

5757
@pytest.fixture(scope="function")
58-
def producer(data_source):
58+
def producer(topic, data_source):
5959
producer = kafka.KafkaProducer(
6060
bootstrap_servers=BROKER, api_version=(2, 0, 2), value_serializer=lambda v: json.dumps(v).encode("utf-8")
6161
)
@@ -64,23 +64,39 @@ def producer(data_source):
6464

6565

6666
@pytest.fixture(scope="function")
67-
def consumer(topic, data_source):
67+
def consumer(topic, data_source, producer):
6868
consumer = kafka.KafkaConsumer(
6969
topic,
7070
bootstrap_servers=BROKER,
7171
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
7272
auto_offset_reset="earliest",
73-
consumer_timeout_ms=5000,
73+
consumer_timeout_ms=500,
7474
heartbeat_interval_ms=1000,
7575
group_id="test",
7676
)
77+
# The first time the kafka consumer is created and polled, it returns a StopIterator
78+
# exception. To by-pass this, loop over the consumer before using it.
79+
# NOTE: This seems to only happen in Python2.7.
80+
for record in consumer:
81+
pass
7782
yield consumer
7883
consumer.close()
7984

8085

8186
@pytest.fixture(scope="function")
8287
def topic():
83-
yield "test-topic-%s" % str(uuid.uuid4())
88+
# from kafka.admin.client import KafkaAdminClient
89+
# from kafka.admin.new_topic import NewTopic
90+
91+
topic = "test-topic-%s" % str(uuid.uuid4())
92+
93+
# admin = KafkaAdminClient(bootstrap_servers=BROKER)
94+
# new_topics = [NewTopic(topic, num_partitions=1, replication_factor=1)]
95+
# topics = admin.create_topics(new_topics)
96+
97+
yield topic
98+
99+
# admin.delete_topics([topic])
84100

85101

86102
@pytest.fixture(scope="session")
@@ -96,7 +112,7 @@ def data_source():
96112

97113
@transient_function_wrapper(kafka.producer.kafka, "KafkaProducer.send.__wrapped__")
98114
# Place transient wrapper underneath instrumentation
99-
def cache_kafka_headers(wrapped, instance, args, kwargs):
115+
def cache_kafka_producer_headers(wrapped, instance, args, kwargs):
100116
transaction = current_transaction()
101117

102118
if transaction is None:
@@ -107,3 +123,29 @@ def cache_kafka_headers(wrapped, instance, args, kwargs):
107123
headers = dict(headers)
108124
transaction._test_request_headers = headers
109125
return ret
126+
127+
128+
@transient_function_wrapper(kafka.consumer.group, "KafkaConsumer.__next__")
129+
# Place transient wrapper underneath instrumentation
130+
def cache_kafka_consumer_headers(wrapped, instance, args, kwargs):
131+
record = wrapped(*args, **kwargs)
132+
transaction = current_transaction()
133+
134+
if transaction is None:
135+
return record
136+
137+
headers = record.headers
138+
headers = dict(headers)
139+
transaction._test_request_headers = headers
140+
return record
141+
142+
143+
@pytest.fixture(autouse=True)
144+
def assert_no_active_transaction():
145+
# Run before test
146+
assert not current_transaction(active_only=False), "Transaction exists before test run."
147+
148+
yield # Run test
149+
150+
# Run after test
151+
assert not current_transaction(active_only=False), "Transaction was not properly exited."

tests/messagebroker_kafkapython/test_kafka_consumer.py renamed to tests/messagebroker_kafkapython/test_consumer.py

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,26 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import kafka
1516
import kafka.errors as Errors
1617
import pytest
18+
from conftest import BROKER, cache_kafka_consumer_headers
1719
from testing_support.fixtures import (
1820
validate_attributes,
1921
validate_error_event_attributes_outside_transaction,
2022
validate_transaction_errors,
2123
validate_transaction_metrics,
2224
)
25+
from testing_support.validators.validate_distributed_trace_accepted import (
26+
validate_distributed_trace_accepted,
27+
)
2328

2429
from newrelic.api.background_task import background_task
2530
from newrelic.api.transaction import end_of_transaction
2631
from newrelic.packages import six
2732

2833

29-
def test_custom_metrics_are_recorded(get_consumer_records, topic):
34+
def test_custom_metrics(get_consumer_records, topic):
3035
@validate_transaction_metrics(
3136
"Named/%s" % topic,
3237
group="Message/Kafka/Topic",
@@ -42,11 +47,9 @@ def _test():
4247
_test()
4348

4449

45-
def test_custom_metrics_are_recorded_on_already_active_transaction(get_consumer_records, topic):
50+
def test_custom_metrics_on_existing_transaction(get_consumer_records, topic):
4651
transaction_name = (
47-
"test_kafka_consumer:test_custom_metrics_are_recorded_on_already_active_transaction.<locals>._test"
48-
if six.PY3
49-
else "test_kafka_consumer:_test"
52+
"test_consumer:test_custom_metrics_on_existing_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
5053
)
5154

5255
@validate_transaction_metrics(
@@ -64,11 +67,9 @@ def _test():
6467
_test()
6568

6669

67-
def test_custom_metrics_are_not_recorded_on_inactive_transaction(get_consumer_records, topic):
70+
def test_custom_metrics_inactive_transaction(get_consumer_records, topic):
6871
transaction_name = (
69-
"test_kafka_consumer:test_custom_metrics_are_not_recorded_on_inactive_transaction.<locals>._test"
70-
if six.PY3
71-
else "test_kafka_consumer:_test"
72+
"test_consumer:test_custom_metrics_inactive_transaction.<locals>._test" if six.PY3 else "test_consumer:_test"
7273
)
7374

7475
@validate_transaction_metrics(
@@ -87,15 +88,15 @@ def _test():
8788
_test()
8889

8990

90-
def test_agent_attributes_are_recorded(get_consumer_records):
91+
def test_agent_attributes(get_consumer_records):
9192
@validate_attributes("agent", ["kafka.consume.client_id", "kafka.consume.byteCount"])
9293
def _test():
9394
get_consumer_records()
9495

9596
_test()
9697

9798

98-
def test_agent_records_error_if_raised(get_consumer_records, consumer_next_raises):
99+
def test_consumer_errors(get_consumer_records, consumer_next_raises):
99100
@validate_error_event_attributes_outside_transaction(
100101
exact_attrs={"intrinsic": {"error.class": "kafka.errors:KafkaError"}}
101102
)
@@ -106,7 +107,23 @@ def _test():
106107
_test()
107108

108109

109-
def test_agent_does_not_record_error_if_not_raised(get_consumer_records):
110+
def test_consumer_deserialization_errors(topic, consumer):
111+
producer = kafka.KafkaProducer(
112+
bootstrap_servers=BROKER, api_version=(2, 0, 2), value_serializer=lambda v: str(v).encode("utf-8")
113+
) # Producer that allows us to upload invalid JSON.
114+
115+
@validate_error_event_attributes_outside_transaction(exact_attrs={"intrinsic": {"error.class": "ValueError"}})
116+
def _test():
117+
with pytest.raises(ValueError):
118+
producer.send(topic, value="%") # Invalid JSON
119+
producer.flush()
120+
for _ in consumer:
121+
pass
122+
123+
_test()
124+
125+
126+
def test_consumer_handled_errors_not_recorded(get_consumer_records):
110127
# It's important to check that we do not notice the StopIteration error.
111128
@validate_transaction_errors([])
112129
def _test():
@@ -115,6 +132,41 @@ def _test():
115132
_test()
116133

117134

135+
def test_distributed_tracing_headers(topic, producer, consumer):
136+
# Send the messages inside a transaction, making sure to close it.
137+
@background_task()
138+
def _produce():
139+
producer.send(topic, value={"foo": "bar"})
140+
producer.flush()
141+
142+
consumer_iter = iter(consumer)
143+
144+
@validate_transaction_metrics(
145+
"Named/%s" % topic,
146+
group="Message/Kafka/Topic",
147+
rollup_metrics=[
148+
("Supportability/DistributedTrace/AcceptPayload/Success", None),
149+
("Supportability/TraceContext/Accept/Success", 1),
150+
],
151+
background_task=True,
152+
)
153+
def _consume():
154+
@validate_distributed_trace_accepted(transport_type="Kafka")
155+
@cache_kafka_consumer_headers
156+
def _test():
157+
# Start the transaction but don't exit it.
158+
next(consumer_iter)
159+
160+
_test()
161+
162+
# Exit the transaction.
163+
with pytest.raises(StopIteration):
164+
next(consumer_iter)
165+
166+
_produce()
167+
_consume()
168+
169+
118170
@pytest.fixture()
119171
def get_consumer_records(topic, producer, consumer):
120172
def _test():
@@ -132,4 +184,4 @@ def _poll(*args, **kwargs):
132184
raise Errors.KafkaError()
133185

134186
consumer.poll = _poll
135-
consumer
187+
return consumer

tests/messagebroker_kafkapython/test_metrics.py

Lines changed: 16 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -12,112 +12,27 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from newrelic.packages import six
16+
1517

1618
def test_data_source_metrics(data_source, topic, producer, consumer):
19+
_data_source_metrics = {
20+
"MessageBroker/Kafka/Internal/kafka-metrics-count/count": "present",
21+
"MessageBroker/Kafka/Internal/producer-metrics/request-rate": "present",
22+
"MessageBroker/Kafka/Internal/producer-topic-metrics.%s/record-send-rate" % topic: "present",
23+
"MessageBroker/Kafka/Internal/consumer-metrics/request-rate": "present",
24+
}
25+
1726
producer.send(topic, value=1)
1827
producer.flush()
19-
next(iter(consumer))
28+
for _ in consumer:
29+
pass
2030

2131
metrics = dict(data_source())
22-
metric_names = list(metrics.keys())
2332
assert metrics
2433

25-
26-
# Example metrics
27-
28-
# MessageBroker/Kafka/Internal/kafka-metrics-count/count
29-
# MessageBroker/Kafka/Internal/producer-metrics/connection-close-rate
30-
# MessageBroker/Kafka/Internal/producer-metrics/connection-creation-rate
31-
# MessageBroker/Kafka/Internal/producer-metrics/select-rate
32-
# MessageBroker/Kafka/Internal/producer-metrics/io-wait-time-ns-avg
33-
# MessageBroker/Kafka/Internal/producer-metrics/io-wait-ratio
34-
# MessageBroker/Kafka/Internal/producer-metrics/io-time-ns-avg
35-
# MessageBroker/Kafka/Internal/producer-metrics/io-ratio
36-
# MessageBroker/Kafka/Internal/producer-metrics/connection-count
37-
# MessageBroker/Kafka/Internal/producer-metrics/batch-size-avg
38-
# MessageBroker/Kafka/Internal/producer-metrics/batch-size-max
39-
# MessageBroker/Kafka/Internal/producer-metrics/compression-rate-avg
40-
# MessageBroker/Kafka/Internal/producer-metrics/record-queue-time-avg
41-
# MessageBroker/Kafka/Internal/producer-metrics/record-queue-time-max
42-
# MessageBroker/Kafka/Internal/producer-metrics/record-send-rate
43-
# MessageBroker/Kafka/Internal/producer-metrics/records-per-request-avg
44-
# MessageBroker/Kafka/Internal/producer-metrics/byte-rate
45-
# MessageBroker/Kafka/Internal/producer-metrics/record-size-max
46-
# MessageBroker/Kafka/Internal/producer-metrics/record-size-avg
47-
# MessageBroker/Kafka/Internal/producer-metrics/metadata-age
48-
# MessageBroker/Kafka/Internal/producer-metrics/network-io-rate
49-
# MessageBroker/Kafka/Internal/producer-metrics/outgoing-byte-rate
50-
# MessageBroker/Kafka/Internal/producer-metrics/request-rate
51-
# MessageBroker/Kafka/Internal/producer-metrics/request-size-avg
52-
# MessageBroker/Kafka/Internal/producer-metrics/request-size-max
53-
# MessageBroker/Kafka/Internal/producer-metrics/incoming-byte-rate
54-
# MessageBroker/Kafka/Internal/producer-metrics/response-rate
55-
# MessageBroker/Kafka/Internal/producer-metrics/request-latency-avg
56-
# MessageBroker/Kafka/Internal/producer-metrics/request-latency-max
57-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/outgoing-byte-rate
58-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-rate
59-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-size-avg
60-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-size-max
61-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/incoming-byte-rate
62-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/response-rate
63-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-latency-avg
64-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-bootstrap-0/request-latency-max
65-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/outgoing-byte-rate
66-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-rate
67-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-size-avg
68-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-size-max
69-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/incoming-byte-rate
70-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/response-rate
71-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-latency-avg
72-
# MessageBroker/Kafka/Internal/producer-node-metrics.node-1001/request-latency-max
73-
# MessageBroker/Kafka/Internal/producer-topic-metrics.test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/record-send-rate
74-
# MessageBroker/Kafka/Internal/producer-topic-metrics.test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/byte-rate
75-
# MessageBroker/Kafka/Internal/producer-topic-metrics.test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/compression-rate
76-
# MessageBroker/Kafka/Internal/consumer-metrics/connection-close-rate
77-
# MessageBroker/Kafka/Internal/consumer-metrics/connection-creation-rate
78-
# MessageBroker/Kafka/Internal/consumer-metrics/select-rate
79-
# MessageBroker/Kafka/Internal/consumer-metrics/io-wait-time-ns-avg
80-
# MessageBroker/Kafka/Internal/consumer-metrics/io-wait-ratio
81-
# MessageBroker/Kafka/Internal/consumer-metrics/io-time-ns-avg
82-
# MessageBroker/Kafka/Internal/consumer-metrics/io-ratio
83-
# MessageBroker/Kafka/Internal/consumer-metrics/connection-count
84-
# MessageBroker/Kafka/Internal/consumer-metrics/network-io-rate
85-
# MessageBroker/Kafka/Internal/consumer-metrics/outgoing-byte-rate
86-
# MessageBroker/Kafka/Internal/consumer-metrics/request-rate
87-
# MessageBroker/Kafka/Internal/consumer-metrics/request-size-avg
88-
# MessageBroker/Kafka/Internal/consumer-metrics/request-size-max
89-
# MessageBroker/Kafka/Internal/consumer-metrics/incoming-byte-rate
90-
# MessageBroker/Kafka/Internal/consumer-metrics/response-rate
91-
# MessageBroker/Kafka/Internal/consumer-metrics/request-latency-avg
92-
# MessageBroker/Kafka/Internal/consumer-metrics/request-latency-max
93-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/outgoing-byte-rate
94-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-rate
95-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-size-avg
96-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-size-max
97-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/incoming-byte-rate
98-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/response-rate
99-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-latency-avg
100-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-bootstrap-0/request-latency-max
101-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-size-avg
102-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-size-max
103-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/bytes-consumed-rate
104-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-per-request-avg
105-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-consumed-rate
106-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-latency-avg
107-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-latency-max
108-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-rate
109-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-lag-max
110-
# MessageBroker/Kafka/Internal/consumer-coordinator-metrics/assigned-partitions
111-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/outgoing-byte-rate
112-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-rate
113-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-size-avg
114-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-size-max
115-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/incoming-byte-rate
116-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/response-rate
117-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-latency-avg
118-
# MessageBroker/Kafka/Internal/consumer-node-metrics.node-1001/request-latency-max
119-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/fetch-size-avg
120-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/fetch-size-max
121-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/bytes-consumed-rate
122-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/records-per-request-avg
123-
# MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/test-topic-c962647a-f6cf-4a24-a90b-40ab2364dc55/records-consumed-rate
34+
for metric_name, count in six.iteritems(_data_source_metrics):
35+
if count == "present":
36+
assert metric_name in metrics
37+
else:
38+
assert metrics[metric_name]["count"] == count, "%s:%d" % (metric_name, count)

0 commit comments

Comments
 (0)