Skip to content

Commit 926ee94

Browse files
committed
Initial commit
1 parent 1c0166f commit 926ee94

File tree

2 files changed

+86
-117
lines changed

2 files changed

+86
-117
lines changed

newrelic/config.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4013,18 +4013,9 @@ def _process_module_builtin_defaults():
40134013
# _process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task")
40144014
_process_module_definition("celery.app.trace", "newrelic.hooks.application_celery", "instrument_celery_app_trace")
40154015
_process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker")
4016-
# _process_module_definition(
4017-
# "celery.concurrency.processes", "newrelic.hooks.application_celery", "instrument_celery_worker"
4018-
# )
40194016
_process_module_definition(
40204017
"celery.concurrency.prefork", "newrelic.hooks.application_celery", "instrument_celery_worker"
40214018
)
4022-
# _process_module_definition(
4023-
# "celery.concurrency.asynpool", "newrelic.hooks.application_celery", "instrument_celery_worker"
4024-
# )
4025-
_process_module_definition(
4026-
"billiard.process", "newrelic.hooks.application_celery", "instrument_billiard_process"
4027-
)
40284019

40294020
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")
40304021

newrelic/hooks/application_celery.py

Lines changed: 86 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from newrelic.api.function_trace import FunctionTrace
2828
from newrelic.api.message_trace import MessageTrace
2929
from newrelic.api.pre_function import wrap_pre_function
30-
from newrelic.api.post_function import wrap_post_function
3130
from newrelic.api.transaction import current_transaction
3231
from newrelic.common.object_wrapper import FunctionWrapper, _NRBoundFunctionWrapper, wrap_function_wrapper
3332
from newrelic.common.signature import bind_args
@@ -64,6 +63,84 @@ def task_info(instance, *args, **kwargs):
6463
return task_name, task_source
6564

6665

66+
def wrap_task_call(wrapped, instance, args, kwargs):
67+
transaction = current_transaction(active_only=False)
68+
69+
# Grab task name and source
70+
_name, _source = task_info(wrapped, *args, **kwargs)
71+
72+
# A Celery Task can be called either outside of a transaction, or
73+
# within the context of an existing transaction. There are 3
74+
# possibilities we need to handle:
75+
#
76+
# 1. In an inactive transaction
77+
#
78+
# If the end_of_transaction() or ignore_transaction() API calls
79+
# have been invoked, this task may be called in the context
80+
# of an inactive transaction. In this case, don't wrap the task
81+
# in any way. Just run the original function.
82+
#
83+
# 2. In an active transaction
84+
#
85+
# Run the original function inside a FunctionTrace.
86+
#
87+
# 3. Outside of a transaction
88+
#
89+
# This is the typical case for a celery Task. Since it's not
90+
# running inside of an existing transaction, we want to create
91+
# a new background transaction for it.
92+
93+
if transaction and (transaction.ignore_transaction or transaction.stopped):
94+
return wrapped(*args, **kwargs)
95+
96+
elif transaction:
97+
with FunctionTrace(_name, source=_source):
98+
return wrapped(*args, **kwargs)
99+
100+
else:
101+
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
102+
# Attempt to grab distributed tracing headers
103+
try:
104+
# Headers on earlier versions of Celery may end up as attributes
105+
# on the request context instead of as custom headers. Handler this
106+
# by defaulting to using vars() if headers is not available
107+
# print(f"wrapped.request: {wrapped.request}")
108+
request = wrapped.request
109+
headers = getattr(request, "headers", None) or vars(request)
110+
111+
settings = transaction.settings
112+
if headers is not None and settings is not None:
113+
if settings.distributed_tracing.enabled:
114+
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
115+
elif transaction.settings.cross_application_tracer.enabled:
116+
transaction._process_incoming_cat_headers(
117+
headers.get(MessageTrace.cat_id_key, None),
118+
headers.get(MessageTrace.cat_transaction_key, None),
119+
)
120+
except Exception:
121+
pass
122+
123+
return wrapped(*args, **kwargs)
124+
125+
126+
def wrap_build_tracer(wrapped, instance, args, kwargs):
127+
class TaskWrapper(FunctionWrapper):
128+
def run(self, *args, **kwargs):
129+
return self.__call__(*args, **kwargs)
130+
131+
try:
132+
bound_args = bind_args(wrapped, args, kwargs)
133+
task = bound_args.get("task", None)
134+
135+
task = TaskWrapper(task, wrap_task_call)
136+
bound_args["task"] = task
137+
138+
return wrapped(**bound_args)
139+
except:
140+
# If we can't bind the args, we just call the wrapped function
141+
return wrapped(*args, **kwargs)
142+
143+
67144
def CeleryTaskWrapper(wrapped):
68145
def wrapper(wrapped, instance, args, kwargs):
69146
transaction = current_transaction(active_only=False)
@@ -158,8 +235,12 @@ def run(self, *args, **kwargs):
158235
wrapped_task.__module__ = CeleryTaskWrapper.__module__
159236

160237
return wrapped_task
238+
161239

162-
240+
# This will not work with the current version of Celery
241+
# This only gets called during the async execution of a task
242+
# and the task is wrapped later in the process to accomodate
243+
# custom task classes.
163244
def wrap_Celery_send_task(wrapped, instance, args, kwargs):
164245
transaction = current_transaction()
165246
if not transaction:
@@ -201,98 +282,6 @@ def wrap_worker_optimizations(wrapped, instance, args, kwargs):
201282
return result
202283

203284

204-
def wrap_task_trace_wrapper(wrapped, instance, args, kwargs):
205-
bound_args = bind_args(wrapped, args, kwargs)
206-
task_name = bound_args.get("task", None)
207-
localized_args = bound_args.get("_loc", None) or wrapped.__globals__['_localized']
208-
try:
209-
tasks, accept, hostname = localized_args if localized_args else (None, None, None)
210-
task_function = tasks.get(task_name, None)
211-
tasks[task_name] = task_function
212-
bound_args["_loc"] = (tasks, accept, hostname)
213-
transaction = current_transaction(active_only=False)
214-
215-
# Grab task name and source
216-
_name = task_name
217-
_source = task_function
218-
219-
# A Celery Task can be called either outside of a transaction, or
220-
# within the context of an existing transaction. There are 3
221-
# possibilities we need to handle:
222-
#
223-
# 1. In an inactive transaction
224-
#
225-
# If the end_of_transaction() or ignore_transaction() API calls
226-
# have been invoked, this task may be called in the context
227-
# of an inactive transaction. In this case, don't wrap the task
228-
# in any way. Just run the original function.
229-
#
230-
# 2. In an active transaction
231-
#
232-
# Run the original function inside a FunctionTrace.
233-
#
234-
# 3. Outside of a transaction
235-
#
236-
# This is the typical case for a celery Task. Since it's not
237-
# running inside of an existing transaction, we want to create
238-
# a new background transaction for it.
239-
240-
if transaction and (transaction.ignore_transaction or transaction.stopped):
241-
return wrapped(*args, **kwargs)
242-
243-
elif transaction:
244-
with FunctionTrace(_name, source=_source):
245-
return wrapped(*args, **kwargs)
246-
247-
else:
248-
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
249-
# Attempt to grab distributed tracing headers
250-
try:
251-
# Headers on earlier versions of Celery may end up as attributes
252-
# on the request context instead of as custom headers. Handler this
253-
# by defaulting to using vars() if headers is not available
254-
255-
request = task_function.request
256-
headers = getattr(request, "headers", None) or vars(request)
257-
258-
settings = transaction.settings
259-
if headers is not None and settings is not None:
260-
if settings.distributed_tracing.enabled:
261-
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
262-
elif transaction.settings.cross_application_tracer.enabled:
263-
transaction._process_incoming_cat_headers(
264-
headers.get(MessageTrace.cat_id_key, None),
265-
headers.get(MessageTrace.cat_transaction_key, None),
266-
)
267-
except Exception:
268-
pass
269-
270-
return wrapped(*args, **kwargs)
271-
272-
except Exception as e:
273-
return wrapped(*args, **kwargs)
274-
275-
276-
def wrap_request_wrapper(wrapped, instance, args, kwargs):
277-
try:
278-
result = wrapped(*args, **kwargs)
279-
ready, request = result
280-
job, i, task_tracer_function, _args, _kwargs = request
281-
task_tracer_function = FunctionWrapper(task_tracer_function, wrap_task_trace_wrapper)
282-
request = (job, i, task_tracer_function, _args, _kwargs)
283-
result = (ready, request)
284-
except Exception as e:
285-
return wrapped(*args, **kwargs)
286-
287-
return result
288-
289-
290-
def wrap_protected_receive_wrapper(wrapped, instance, args, kwargs):
291-
receive_function = wrapped(*args, **kwargs)
292-
wrapped_receive_function = FunctionWrapper(receive_function, wrap_request_wrapper)
293-
return wrapped_receive_function
294-
295-
296285
def instrument_celery_local(module):
297286
if hasattr(module, "Proxy"):
298287
# This is used in the case where the function is
@@ -331,13 +320,6 @@ def process_destructor(*args, **kwargs):
331320
return _process_destructor(*args, **kwargs)
332321

333322
module.process_destructor = process_destructor
334-
335-
336-
def instrument_billiard_process(module):
337-
def force_application_activation(*args, **kwargs):
338-
application_instance().activate()
339-
340-
wrap_pre_function(module, "Process.run", force_application_activation)
341323

342324

343325
def instrument_celery_loaders_base(module):
@@ -351,17 +333,10 @@ def instrument_billiard_pool(module):
351333
def force_agent_shutdown(*args, **kwargs):
352334
shutdown_agent()
353335

354-
if hasattr(module, "Worker") and hasattr(module.Worker, "_make_protected_receive"):
355-
# `_make_protected_receive` returns the `receive` function
356-
# `receive` returns a tuple of (`ready`, `request`)
357-
# where `request` is a tuple of (job, i, task_tracer_function, _args, _kwargs)
358-
# and `task_tracer_function` is a function that executes the task
359-
wrap_function_wrapper(module, "Worker._make_protected_receive", wrap_protected_receive_wrapper)
360336
if hasattr(module, "Worker") and hasattr(module.Worker, "_do_exit"):
361337
wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown)
362338

363339

364-
365340
def instrument_celery_app_trace(module):
366341
# Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers
367342
if hasattr(module, "setup_worker_optimizations"):
@@ -370,3 +345,6 @@ def instrument_celery_app_trace(module):
370345
if hasattr(module, "reset_worker_optimizations"):
371346
wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations)
372347

348+
if hasattr(module, "build_tracer"):
349+
wrap_function_wrapper(module, "build_tracer", wrap_build_tracer)
350+

0 commit comments

Comments
 (0)