Skip to content

Commit 1c0166f

Browse files
committed
Add support for custom task classes
1 parent 9aa6de6 commit 1c0166f

File tree

2 files changed

+145
-38
lines changed

2 files changed

+145
-38
lines changed

newrelic/config.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4007,18 +4007,25 @@ def _process_module_builtin_defaults():
40074007
"instrument_rest_framework_decorators",
40084008
)
40094009

4010-
_process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task")
4011-
_process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task")
4010+
_process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base")
4011+
_process_module_definition("celery.local", "newrelic.hooks.application_celery", "instrument_celery_local")
4012+
# _process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task")
4013+
# _process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task")
40124014
_process_module_definition("celery.app.trace", "newrelic.hooks.application_celery", "instrument_celery_app_trace")
40134015
_process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker")
4016+
# _process_module_definition(
4017+
# "celery.concurrency.processes", "newrelic.hooks.application_celery", "instrument_celery_worker"
4018+
# )
40144019
_process_module_definition(
4015-
"celery.concurrency.processes", "newrelic.hooks.application_celery", "instrument_celery_worker"
4020+
"celery.concurrency.prefork", "newrelic.hooks.application_celery", "instrument_celery_worker"
40164021
)
4022+
# _process_module_definition(
4023+
# "celery.concurrency.asynpool", "newrelic.hooks.application_celery", "instrument_celery_worker"
4024+
# )
40174025
_process_module_definition(
4018-
"celery.concurrency.prefork", "newrelic.hooks.application_celery", "instrument_celery_worker"
4026+
"billiard.process", "newrelic.hooks.application_celery", "instrument_billiard_process"
40194027
)
40204028

4021-
_process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base")
40224029
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")
40234030

40244031
_process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi")

newrelic/hooks/application_celery.py

Lines changed: 133 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@
2727
from newrelic.api.function_trace import FunctionTrace
2828
from newrelic.api.message_trace import MessageTrace
2929
from newrelic.api.pre_function import wrap_pre_function
30+
from newrelic.api.post_function import wrap_post_function
3031
from newrelic.api.transaction import current_transaction
3132
from newrelic.common.object_wrapper import FunctionWrapper, _NRBoundFunctionWrapper, wrap_function_wrapper
33+
from newrelic.common.signature import bind_args
3234
from newrelic.core.agent import shutdown_agent
3335

3436
UNKNOWN_TASK_NAME = "<Unknown Task>"
3537
MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"}
3638

37-
3839
def task_info(instance, *args, **kwargs):
3940
# Grab the current task, which can be located in either place
4041
if instance:
@@ -159,28 +160,6 @@ def run(self, *args, **kwargs):
159160
return wrapped_task
160161

161162

162-
def instrument_celery_app_task(module):
163-
# Triggered for both 'celery.app.task' and 'celery.task.base'.
164-
165-
if hasattr(module, "BaseTask"):
166-
# Need to add a wrapper for background task entry point.
167-
168-
# In Celery 2.2 the 'BaseTask' class actually resided in the
169-
# module 'celery.task.base'. In Celery 2.3 the 'BaseTask' class
170-
# moved to 'celery.app.task' but an alias to it was retained in
171-
# the module 'celery.task.base'. We need to detect both module
172-
# imports, but we check the module name associated with
173-
# 'BaseTask' to ensure that we do not instrument the class via
174-
# the alias in Celery 2.3 and later.
175-
176-
# In Celery 2.5+, although 'BaseTask' still exists execution of
177-
# the task doesn't pass through it. For Celery 2.5+ need to wrap
178-
# the tracer instead.
179-
180-
if module.BaseTask.__module__ == module.__name__:
181-
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__)
182-
183-
184163
def wrap_Celery_send_task(wrapped, instance, args, kwargs):
185164
transaction = current_transaction()
186165
if not transaction:
@@ -222,22 +201,115 @@ def wrap_worker_optimizations(wrapped, instance, args, kwargs):
222201
return result
223202

224203

204+
def wrap_task_trace_wrapper(wrapped, instance, args, kwargs):
205+
bound_args = bind_args(wrapped, args, kwargs)
206+
task_name = bound_args.get("task", None)
207+
localized_args = bound_args.get("_loc", None) or wrapped.__globals__['_localized']
208+
try:
209+
tasks, accept, hostname = localized_args if localized_args else (None, None, None)
210+
task_function = tasks.get(task_name, None)
211+
tasks[task_name] = task_function
212+
bound_args["_loc"] = (tasks, accept, hostname)
213+
transaction = current_transaction(active_only=False)
214+
215+
# Grab task name and source
216+
_name = task_name
217+
_source = task_function
218+
219+
# A Celery Task can be called either outside of a transaction, or
220+
# within the context of an existing transaction. There are 3
221+
# possibilities we need to handle:
222+
#
223+
# 1. In an inactive transaction
224+
#
225+
# If the end_of_transaction() or ignore_transaction() API calls
226+
# have been invoked, this task may be called in the context
227+
# of an inactive transaction. In this case, don't wrap the task
228+
# in any way. Just run the original function.
229+
#
230+
# 2. In an active transaction
231+
#
232+
# Run the original function inside a FunctionTrace.
233+
#
234+
# 3. Outside of a transaction
235+
#
236+
# This is the typical case for a celery Task. Since it's not
237+
# running inside of an existing transaction, we want to create
238+
# a new background transaction for it.
239+
240+
if transaction and (transaction.ignore_transaction or transaction.stopped):
241+
return wrapped(*args, **kwargs)
242+
243+
elif transaction:
244+
with FunctionTrace(_name, source=_source):
245+
return wrapped(*args, **kwargs)
246+
247+
else:
248+
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
249+
# Attempt to grab distributed tracing headers
250+
try:
251+
# Headers on earlier versions of Celery may end up as attributes
252+
# on the request context instead of as custom headers. Handler this
253+
# by defaulting to using vars() if headers is not available
254+
255+
request = task_function.request
256+
headers = getattr(request, "headers", None) or vars(request)
257+
258+
settings = transaction.settings
259+
if headers is not None and settings is not None:
260+
if settings.distributed_tracing.enabled:
261+
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
262+
elif transaction.settings.cross_application_tracer.enabled:
263+
transaction._process_incoming_cat_headers(
264+
headers.get(MessageTrace.cat_id_key, None),
265+
headers.get(MessageTrace.cat_transaction_key, None),
266+
)
267+
except Exception:
268+
pass
269+
270+
return wrapped(*args, **kwargs)
271+
272+
except Exception as e:
273+
return wrapped(*args, **kwargs)
274+
275+
276+
def wrap_request_wrapper(wrapped, instance, args, kwargs):
277+
try:
278+
result = wrapped(*args, **kwargs)
279+
ready, request = result
280+
job, i, task_tracer_function, _args, _kwargs = request
281+
task_tracer_function = FunctionWrapper(task_tracer_function, wrap_task_trace_wrapper)
282+
request = (job, i, task_tracer_function, _args, _kwargs)
283+
result = (ready, request)
284+
except Exception as e:
285+
return wrapped(*args, **kwargs)
286+
287+
return result
288+
289+
290+
def wrap_protected_receive_wrapper(wrapped, instance, args, kwargs):
291+
receive_function = wrapped(*args, **kwargs)
292+
wrapped_receive_function = FunctionWrapper(receive_function, wrap_request_wrapper)
293+
return wrapped_receive_function
294+
295+
296+
def instrument_celery_local(module):
297+
if hasattr(module, "Proxy"):
298+
# This is used in the case where the function is
299+
# called directly on the Proxy object (rather than
300+
# using "delay" or "apply_async")
301+
module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__)
302+
303+
225304
def instrument_celery_app_base(module):
226305
if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
227306
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
228307

229308

230309
def instrument_celery_worker(module):
231-
# Triggered for 'celery.worker' and 'celery.concurrency.processes'.
232-
233310
if hasattr(module, "process_initializer"):
234-
# We try and force registration of default application after
235-
# fork of worker process rather than lazily on first request.
236-
237-
# Originally the 'process_initializer' function was located in
238-
# 'celery.worker'. In Celery 2.5 the function 'process_initializer'
239-
# was moved to the module 'celery.concurrency.processes'.
240-
311+
# We try and force activation of the agent before
312+
# the worker process starts.
241313
_process_initializer = module.process_initializer
242314

243315
@functools.wraps(module.process_initializer)
@@ -246,6 +318,26 @@ def process_initializer(*args, **kwargs):
246318
return _process_initializer(*args, **kwargs)
247319

248320
module.process_initializer = process_initializer
321+
322+
323+
if hasattr(module, "process_destructor"):
324+
# We try and force shutdown of the agent before
325+
# the worker process exits.
326+
_process_destructor = module.process_destructor
327+
328+
@functools.wraps(module.process_destructor)
329+
def process_destructor(*args, **kwargs):
330+
shutdown_agent()
331+
return _process_destructor(*args, **kwargs)
332+
333+
module.process_destructor = process_destructor
334+
335+
336+
def instrument_billiard_process(module):
337+
def force_application_activation(*args, **kwargs):
338+
application_instance().activate()
339+
340+
wrap_pre_function(module, "Process.run", force_application_activation)
249341

250342

251343
def instrument_celery_loaders_base(module):
@@ -259,8 +351,15 @@ def instrument_billiard_pool(module):
259351
def force_agent_shutdown(*args, **kwargs):
260352
shutdown_agent()
261353

262-
if hasattr(module, "Worker"):
354+
if hasattr(module, "Worker") and hasattr(module.Worker, "_make_protected_receive"):
355+
# `_make_protected_receive` returns the `receive` function
356+
# `receive` returns a tuple of (`ready`, `request`)
357+
# where `request` is a tuple of (job, i, task_tracer_function, _args, _kwargs)
358+
# and `task_tracer_function` is a function that executes the task
359+
wrap_function_wrapper(module, "Worker._make_protected_receive", wrap_protected_receive_wrapper)
360+
if hasattr(module, "Worker") and hasattr(module.Worker, "_do_exit"):
263361
wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown)
362+
264363

265364

266365
def instrument_celery_app_trace(module):
@@ -270,3 +369,4 @@ def instrument_celery_app_trace(module):
270369

271370
if hasattr(module, "reset_worker_optimizations"):
272371
wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations)
372+

0 commit comments

Comments
 (0)