Skip to content

Commit 7f2493c

Browse files
committed
Add comments and shuffle code
1 parent 11cbdbf commit 7f2493c

File tree

1 file changed

+97
-88
lines changed

1 file changed

+97
-88
lines changed

newrelic/hooks/application_celery.py

Lines changed: 97 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -63,94 +63,8 @@ def task_info(instance, *args, **kwargs):
6363

6464
return task_name, task_source
6565

66-
67-
def wrap_task_call(wrapped, instance, args, kwargs):
68-
transaction = current_transaction(active_only=False)
69-
70-
# Grab task name and source
71-
_name, _source = task_info(wrapped, *args, **kwargs)
72-
73-
# A Celery Task can be called either outside of a transaction, or
74-
# within the context of an existing transaction. There are 3
75-
# possibilities we need to handle:
76-
#
77-
# 1. In an inactive transaction
78-
#
79-
# If the end_of_transaction() or ignore_transaction() API calls
80-
# have been invoked, this task may be called in the context
81-
# of an inactive transaction. In this case, don't wrap the task
82-
# in any way. Just run the original function.
83-
#
84-
# 2. In an active transaction
85-
#
86-
# Run the original function inside a FunctionTrace.
87-
#
88-
# 3. Outside of a transaction
89-
#
90-
# This is the typical case for a celery Task. Since it's not
91-
# running inside of an existing transaction, we want to create
92-
# a new background transaction for it.
93-
94-
if transaction and (transaction.ignore_transaction or transaction.stopped):
95-
return wrapped(*args, **kwargs)
96-
97-
elif transaction:
98-
with FunctionTrace(_name, source=_source):
99-
return wrapped(*args, **kwargs)
100-
101-
else:
102-
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
103-
# Attempt to grab distributed tracing headers
104-
try:
105-
# Headers on earlier versions of Celery may end up as attributes
106-
# on the request context instead of as custom headers. Handle this
107-
# by defaulting to using `vars()` if headers is not available
108-
109-
# If there is no request, the request property will return
110-
# a new instance of `celery.Context()` instead of `None`, so
111-
# this will be handled by accessing the request_stack directly.
112-
request = wrapped and wrapped.request_stack and wrapped.request_stack.top
113-
headers = getattr(request, "headers", None) or vars(request)
114-
115-
settings = transaction.settings
116-
if headers is not None and settings is not None:
117-
if settings.distributed_tracing.enabled:
118-
# Generate DT headers if they do not already exist in the incoming request
119-
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
120-
try:
121-
dt_headers = MessageTrace.generate_request_headers(transaction)
122-
if dt_headers:
123-
headers.update(dict(dt_headers))
124-
except Exception:
125-
pass
126-
elif transaction.settings.cross_application_tracer.enabled:
127-
transaction._process_incoming_cat_headers(
128-
headers.get(MessageTrace.cat_id_key, None),
129-
headers.get(MessageTrace.cat_transaction_key, None),
130-
)
131-
except Exception:
132-
pass
133-
134-
return wrapped(*args, **kwargs)
135-
136-
137-
def wrap_build_tracer(wrapped, instance, args, kwargs):
138-
class TaskWrapper(FunctionWrapper):
139-
def run(self, *args, **kwargs):
140-
return self.__call__(*args, **kwargs)
141-
142-
try:
143-
bound_args = bind_args(wrapped, args, kwargs)
144-
task = bound_args.get("task", None)
145-
146-
task = TaskWrapper(task, wrap_task_call)
147-
bound_args["task"] = task
148-
149-
return wrapped(**bound_args)
150-
except:
151-
# If we can't bind the args, we just call the wrapped function
152-
return wrapped(*args, **kwargs)
153-
66+
# =============
67+
# Celery instrumentation for direct task calls (__call__ or run)
15468

15569
def CeleryTaskWrapper(wrapped):
15670
def wrapper(wrapped, instance, args, kwargs):
@@ -253,6 +167,7 @@ def run(self, *args, **kwargs):
253167
return self.__call__(*args, **kwargs)
254168

255169
wrapped_task = TaskWrapper(wrapped, wrapper)
170+
# Reset __module__ to be less transparent so celery detects our monkey-patching
256171
wrapped_task.__module__ = CeleryTaskWrapper.__module__
257172

258173
return wrapped_task
@@ -265,6 +180,100 @@ def instrument_celery_local(module):
265180
# using `delay` or `apply_async`)
266181
module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__)
267182

183+
# =============
184+
185+
# =============
186+
# Celery Instrumentation for delay/apply_async/apply:
187+
188+
def wrap_task_call(wrapped, instance, args, kwargs):
189+
transaction = current_transaction(active_only=False)
190+
191+
# Grab task name and source
192+
_name, _source = task_info(wrapped, *args, **kwargs)
193+
194+
# A Celery Task can be called either outside of a transaction, or
195+
# within the context of an existing transaction. There are 3
196+
# possibilities we need to handle:
197+
#
198+
# 1. In an inactive transaction
199+
#
200+
# If the end_of_transaction() or ignore_transaction() API calls
201+
# have been invoked, this task may be called in the context
202+
# of an inactive transaction. In this case, don't wrap the task
203+
# in any way. Just run the original function.
204+
#
205+
# 2. In an active transaction
206+
#
207+
# Run the original function inside a FunctionTrace.
208+
#
209+
# 3. Outside of a transaction
210+
#
211+
# This is the typical case for a celery Task. Since it's not
212+
# running inside of an existing transaction, we want to create
213+
# a new background transaction for it.
214+
215+
if transaction and (transaction.ignore_transaction or transaction.stopped):
216+
return wrapped(*args, **kwargs)
217+
218+
elif transaction:
219+
with FunctionTrace(_name, source=_source):
220+
return wrapped(*args, **kwargs)
221+
222+
else:
223+
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
224+
# Attempt to grab distributed tracing headers
225+
try:
226+
# Headers on earlier versions of Celery may end up as attributes
227+
# on the request context instead of as custom headers. Handle this
228+
# by defaulting to using `vars()` if headers is not available
229+
230+
# If there is no request, the request property will return
231+
# a new instance of `celery.Context()` instead of `None`, so
232+
# this will be handled by accessing the request_stack directly.
233+
request = wrapped and wrapped.request_stack and wrapped.request_stack.top
234+
headers = getattr(request, "headers", None) or vars(request)
235+
236+
settings = transaction.settings
237+
if headers is not None and settings is not None:
238+
if settings.distributed_tracing.enabled:
239+
# Generate DT headers if they do not already exist in the incoming request
240+
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
241+
try:
242+
dt_headers = MessageTrace.generate_request_headers(transaction)
243+
if dt_headers:
244+
headers.update(dict(dt_headers))
245+
except Exception:
246+
pass
247+
elif transaction.settings.cross_application_tracer.enabled:
248+
transaction._process_incoming_cat_headers(
249+
headers.get(MessageTrace.cat_id_key, None),
250+
headers.get(MessageTrace.cat_transaction_key, None),
251+
)
252+
except Exception:
253+
pass
254+
255+
return wrapped(*args, **kwargs)
256+
257+
258+
def wrap_build_tracer(wrapped, instance, args, kwargs):
259+
class TaskWrapper(FunctionWrapper):
260+
def run(self, *args, **kwargs):
261+
return self.__call__(*args, **kwargs)
262+
263+
try:
264+
bound_args = bind_args(wrapped, args, kwargs)
265+
task = bound_args.get("task", None)
266+
267+
task = TaskWrapper(task, wrap_task_call)
268+
# Reset __module__ to be less transparent so celery detects our monkey-patching
269+
task.__module__ = wrap_task_call.__module__
270+
bound_args["task"] = task
271+
272+
return wrapped(**bound_args)
273+
except:
274+
# If we can't bind the args, we just call the wrapped function
275+
return wrapped(*args, **kwargs)
276+
268277

269278
def instrument_celery_worker(module):
270279
if hasattr(module, "process_initializer"):

0 commit comments

Comments
 (0)