Skip to content

Commit faabca7

Browse files
committed
Distributed Tracing tests
1 parent 926ee94 commit faabca7

File tree

5 files changed

+213
-130
lines changed

5 files changed

+213
-130
lines changed

newrelic/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4007,7 +4007,7 @@ def _process_module_builtin_defaults():
40074007
"instrument_rest_framework_decorators",
40084008
)
40094009

4010-
_process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base")
4010+
# _process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base")
40114011
_process_module_definition("celery.local", "newrelic.hooks.application_celery", "instrument_celery_local")
40124012
# _process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task")
40134013
# _process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task")

newrelic/hooks/application_celery.py

Lines changed: 59 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ def wrap_task_call(wrapped, instance, args, kwargs):
9595

9696
elif transaction:
9797
with FunctionTrace(_name, source=_source):
98+
# Do we accept distributed tracing headers here?
99+
# request = wrapped.request
100+
# headers = getattr(request, "headers", None) or vars(request)
101+
# transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
98102
return wrapped(*args, **kwargs)
99103

100104
else:
@@ -104,14 +108,28 @@ def wrap_task_call(wrapped, instance, args, kwargs):
104108
# Headers on earlier versions of Celery may end up as attributes
105109
# on the request context instead of as custom headers. Handler this
106110
# by defaulting to using vars() if headers is not available
107-
# print(f"wrapped.request: {wrapped.request}")
108111
request = wrapped.request
109112
headers = getattr(request, "headers", None) or vars(request)
110113

111114
settings = transaction.settings
112115
if headers is not None and settings is not None:
113116
if settings.distributed_tracing.enabled:
114-
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
117+
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
118+
try:
119+
dt_headers = MessageTrace.generate_request_headers(transaction)
120+
# original_headers = kwargs.get("headers", None)
121+
if dt_headers:
122+
if not headers:
123+
wrapped.request.headers = dict(dt_headers)
124+
# kwargs["headers"] = dict(dt_headers)
125+
else:
126+
headers.update(dict(dt_headers))
127+
wrapped.request.headers = headers
128+
# wrapped.request.headers.update(dict(dt_headers))
129+
# kwargs["headers"] = dt_headers = dict(dt_headers)
130+
# dt_headers.update(dict(headers))
131+
except Exception:
132+
pass
115133
elif transaction.settings.cross_application_tracer.enabled:
116134
transaction._process_incoming_cat_headers(
117135
headers.get(MessageTrace.cat_id_key, None),
@@ -133,6 +151,7 @@ def run(self, *args, **kwargs):
133151
task = bound_args.get("task", None)
134152

135153
task = TaskWrapper(task, wrap_task_call)
154+
task.__module__ = wrapped.__module__ # Ensure module is set for monkeypatching detection
136155
bound_args["task"] = task
137156

138157
return wrapped(**bound_args)
@@ -189,7 +208,17 @@ def wrapper(wrapped, instance, args, kwargs):
189208
settings = transaction.settings
190209
if headers is not None and settings is not None:
191210
if settings.distributed_tracing.enabled:
192-
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
211+
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
212+
try:
213+
dt_headers = MessageTrace.generate_request_headers(transaction)
214+
if dt_headers:
215+
if not headers:
216+
instance.request.headers = dict(dt_headers)
217+
else:
218+
headers.update(dict(dt_headers))
219+
instance.request.headers = headers
220+
except Exception:
221+
pass
193222
elif transaction.settings.cross_application_tracer.enabled:
194223
transaction._process_incoming_cat_headers(
195224
headers.get(MessageTrace.cat_id_key, None),
@@ -237,29 +266,29 @@ def run(self, *args, **kwargs):
237266
return wrapped_task
238267

239268

240-
# This will not work with the current version of Celery
241-
# This only gets called during the async execution of a task
242-
# and the task is wrapped later in the process to accomodate
243-
# custom task classes.
244-
def wrap_Celery_send_task(wrapped, instance, args, kwargs):
245-
transaction = current_transaction()
246-
if not transaction:
247-
return wrapped(*args, **kwargs)
248-
249-
# Merge distributed tracing headers into outgoing task headers
250-
try:
251-
dt_headers = MessageTrace.generate_request_headers(transaction)
252-
original_headers = kwargs.get("headers", None)
253-
if dt_headers:
254-
if not original_headers:
255-
kwargs["headers"] = dict(dt_headers)
256-
else:
257-
kwargs["headers"] = dt_headers = dict(dt_headers)
258-
dt_headers.update(dict(original_headers))
259-
except Exception:
260-
pass
261-
262-
return wrapped(*args, **kwargs)
269+
# # This will not work with the current version of Celery
270+
# # This only gets called during the async execution of a task
271+
# # and the task is wrapped later in the process to accomodate
272+
# # custom task classes.
273+
# def wrap_Celery_send_task(wrapped, instance, args, kwargs):
274+
# transaction = current_transaction()
275+
# if not transaction:
276+
# return wrapped(*args, **kwargs)
277+
278+
# # Merge distributed tracing headers into outgoing task headers
279+
# try:
280+
# dt_headers = MessageTrace.generate_request_headers(transaction)
281+
# original_headers = kwargs.get("headers", None)
282+
# if dt_headers:
283+
# if not original_headers:
284+
# kwargs["headers"] = dict(dt_headers)
285+
# else:
286+
# kwargs["headers"] = dt_headers = dict(dt_headers)
287+
# dt_headers.update(dict(original_headers))
288+
# except Exception:
289+
# pass
290+
291+
# return wrapped(*args, **kwargs)
263292

264293

265294
def wrap_worker_optimizations(wrapped, instance, args, kwargs):
@@ -290,9 +319,9 @@ def instrument_celery_local(module):
290319
module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__)
291320

292321

293-
def instrument_celery_app_base(module):
294-
if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
295-
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
322+
# def instrument_celery_app_base(module):
323+
# if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
324+
# wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
296325

297326

298327
def instrument_celery_worker(module):
@@ -347,4 +376,5 @@ def instrument_celery_app_trace(module):
347376

348377
if hasattr(module, "build_tracer"):
349378
wrap_function_wrapper(module, "build_tracer", wrap_build_tracer)
379+
350380

tests/application_celery/test_application.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
@validate_transaction_metrics(
2525
name="test_application:test_celery_task_as_function_trace",
2626
scoped_metrics=[("Function/_target_application.add", 1)],
27+
# scoped_metrics=[("Function/_target_application.add", 2)],
2728
background_task=True,
2829
)
2930
@validate_code_level_metrics("_target_application", "add")
31+
# @validate_code_level_metrics("_target_application", "add", count=2)
3032
@background_task()
3133
def test_celery_task_as_function_trace():
3234
"""

tests/application_celery/test_distributed_tracing.py

Lines changed: 93 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,52 +12,103 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from _target_application import add, assert_dt
15+
import pytest
16+
from _target_application import add
1617
from testing_support.fixtures import override_application_settings
1718
from testing_support.validators.validate_transaction_count import validate_transaction_count
1819
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
1920

2021
from newrelic.api.background_task import background_task
2122

2223

23-
@validate_transaction_metrics(
24-
name="_target_application.assert_dt",
25-
group="Celery",
26-
rollup_metrics=[
27-
("Supportability/DistributedTrace/AcceptPayload/Success", None),
28-
("Supportability/TraceContext/Accept/Success", 1),
29-
],
30-
background_task=True,
31-
index=-2,
32-
)
33-
@validate_transaction_metrics(
34-
name="test_distributed_tracing:test_celery_task_distributed_tracing_enabled", background_task=True
35-
)
36-
@validate_transaction_count(2)
37-
@background_task()
38-
def test_celery_task_distributed_tracing_enabled():
39-
result = assert_dt.apply_async()
40-
result = result.get()
41-
assert result == 1
42-
43-
44-
@override_application_settings({"distributed_tracing.enabled": False})
45-
@validate_transaction_metrics(
46-
name="_target_application.add",
47-
group="Celery",
48-
rollup_metrics=[
49-
("Supportability/DistributedTrace/AcceptPayload/Success", None),
50-
("Supportability/TraceContext/Accept/Success", None), # No trace context should be accepted
51-
],
52-
background_task=True,
53-
index=-2,
54-
)
55-
@validate_transaction_metrics(
56-
name="test_distributed_tracing:test_celery_task_distributed_tracing_disabled", background_task=True
57-
)
58-
@validate_transaction_count(2)
59-
@background_task()
60-
def test_celery_task_distributed_tracing_disabled():
61-
result = add.apply_async((1, 2))
62-
result = result.get()
63-
assert result == 3
24+
@pytest.mark.parametrize("dt_enabled", [True, False])
25+
def test_celery_task_distributed_tracing_inside_background_task(dt_enabled):
26+
@override_application_settings({"distributed_tracing.enabled": dt_enabled})
27+
@validate_transaction_metrics(
28+
name="_target_application.add",
29+
group="Celery",
30+
rollup_metrics=[
31+
("Supportability/TraceContext/Accept/Success", 1 if dt_enabled else None),
32+
("Supportability/TraceContext/TraceParent/Accept/Success", 1 if dt_enabled else None),
33+
],
34+
background_task=True,
35+
index=-2,
36+
)
37+
@validate_transaction_metrics(
38+
name="test_distributed_tracing:test_celery_task_distributed_tracing_inside_background_task.<locals>._test",
39+
rollup_metrics=[
40+
("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None),
41+
("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None),
42+
],
43+
background_task=True,
44+
)
45+
@validate_transaction_count(2) # One for the background task, one for the Celery task. Runs in different processes.
46+
@background_task()
47+
def _test():
48+
result = add.apply_async((1, 2))
49+
result = result.get()
50+
assert result == 3
51+
52+
_test()
53+
54+
55+
@pytest.mark.parametrize("dt_enabled", [True, False])
56+
def test_celery_task_distributed_tracing_outside_background_task(dt_enabled):
57+
@override_application_settings({"distributed_tracing.enabled": dt_enabled})
58+
@validate_transaction_metrics(
59+
name="_target_application.add",
60+
group="Celery",
61+
rollup_metrics=[
62+
("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None),
63+
("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None),
64+
],
65+
background_task=True,
66+
)
67+
@validate_transaction_count(1)
68+
def _test():
69+
result = add.apply_async((1, 2))
70+
result = result.get()
71+
assert result == 3
72+
73+
_test()
74+
75+
76+
# In this case, the background task creating the transaction
77+
# has not generated a distributed trace header, so the Celery
78+
# task will not have a distributed trace header to accept.
79+
@pytest.mark.parametrize("dt_enabled", [True, False])
80+
def test_celery_task_distributed_tracing_inside_background_task_apply(dt_enabled):
81+
@override_application_settings({"distributed_tracing.enabled": dt_enabled})
82+
@validate_transaction_metrics(
83+
name="test_distributed_tracing:test_celery_task_distributed_tracing_inside_background_task_apply.<locals>._test",
84+
background_task=True,
85+
)
86+
@validate_transaction_count(1) # In the same process, so only one transaction
87+
@background_task()
88+
def _test():
89+
result = add.apply((1, 2))
90+
result = result.get()
91+
assert result == 3
92+
93+
_test()
94+
95+
96+
@pytest.mark.parametrize("dt_enabled", [True, False])
97+
def test_celery_task_distributed_tracing_outside_background_task_apply(dt_enabled):
98+
@override_application_settings({"distributed_tracing.enabled": dt_enabled})
99+
@validate_transaction_metrics(
100+
name="_target_application.add",
101+
group="Celery",
102+
rollup_metrics=[
103+
("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None),
104+
("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None),
105+
],
106+
background_task=True,
107+
)
108+
@validate_transaction_count(1) # In the same process, so only one transaction
109+
def _test():
110+
result = add.apply((1, 2))
111+
result = result.get()
112+
assert result == 3
113+
114+
_test()

0 commit comments

Comments
 (0)