Skip to content

Celery (re)instrumentation #1429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Aug 18, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1c0166f
Add support for custom task classes
lrafeei Jul 18, 2025
926ee94
Initial commit
lrafeei Jul 22, 2025
faabca7
Distributed Tracing tests
lrafeei Jul 23, 2025
b7fa0ec
Fix remaining tests and add custom task tests
lrafeei Jul 23, 2025
a4d8232
Merge branch 'main' into celery-custom-task-instrumentation
lrafeei Jul 23, 2025
27cb3a9
Clean up comments in file
lrafeei Jul 23, 2025
ad58275
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 23, 2025
e7113a3
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 23, 2025
5c6bda0
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 28, 2025
0a58054
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 28, 2025
041cccb
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 28, 2025
1c7df3c
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 30, 2025
e296bd0
Ruff format
TimPansino Jul 30, 2025
b792b55
Fix comment typo
TimPansino Aug 1, 2025
1052d28
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 6, 2025
9de2530
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 6, 2025
c77a0a2
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 6, 2025
088fdb1
Reviewer suggestions and more tests
lrafeei Aug 8, 2025
11cbdbf
Clean up commented code
lrafeei Aug 8, 2025
7f2493c
Add comments and shuffle code
lrafeei Aug 8, 2025
d908ba6
Fix celery linter errors
lrafeei Aug 8, 2025
17954ab
Fix azure linter errors
lrafeei Aug 8, 2025
ba90b90
Fix linter error
lrafeei Aug 8, 2025
7634008
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 8, 2025
fbd50a0
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 12, 2025
e3b9bef
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 13, 2025
01b56fb
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 13, 2025
b2b19b3
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 13, 2025
ef587d7
MegaLinter Format
TimPansino Aug 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4010,18 +4010,13 @@ def _process_module_builtin_defaults():
"instrument_rest_framework_decorators",
)

_process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task")
_process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task")
Comment on lines -4013 to -4014
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume with removing these hooks we don't support some older versions of Celery outside our support window, but let's be sure to call that out in the release notes as I'm sure some customers out there use ancient versions of Celery.

_process_module_definition("celery.local", "newrelic.hooks.application_celery", "instrument_celery_local")
_process_module_definition("celery.app.trace", "newrelic.hooks.application_celery", "instrument_celery_app_trace")
_process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker")
_process_module_definition(
"celery.concurrency.processes", "newrelic.hooks.application_celery", "instrument_celery_worker"
)
_process_module_definition(
"celery.concurrency.prefork", "newrelic.hooks.application_celery", "instrument_celery_worker"
)

_process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base")
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")

_process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi")
Expand Down
210 changes: 125 additions & 85 deletions newrelic/hooks/application_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from newrelic.api.message_trace import MessageTrace
from newrelic.api.pre_function import wrap_pre_function
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import FunctionWrapper, _NRBoundFunctionWrapper, wrap_function_wrapper
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper
from newrelic.common.signature import bind_args
from newrelic.core.agent import shutdown_agent

UNKNOWN_TASK_NAME = "<Unknown Task>"
Expand Down Expand Up @@ -63,6 +64,94 @@ def task_info(instance, *args, **kwargs):
return task_name, task_source


def wrap_task_call(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

# Grab task name and source
_name, _source = task_info(wrapped, *args, **kwargs)

# A Celery Task can be called either outside of a transaction, or
# within the context of an existing transaction. There are 3
# possibilities we need to handle:
#
# 1. In an inactive transaction
#
# If the end_of_transaction() or ignore_transaction() API calls
# have been invoked, this task may be called in the context
# of an inactive transaction. In this case, don't wrap the task
# in any way. Just run the original function.
#
# 2. In an active transaction
#
# Run the original function inside a FunctionTrace.
#
# 3. Outside of a transaction
#
# This is the typical case for a celery Task. Since it's not
# running inside of an existing transaction, we want to create
# a new background transaction for it.

if transaction and (transaction.ignore_transaction or transaction.stopped):
return wrapped(*args, **kwargs)

elif transaction:
with FunctionTrace(_name, source=_source):
return wrapped(*args, **kwargs)

else:
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handle this
# by defaulting to using vars() if headers is not available
request = wrapped.request
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
if settings.distributed_tracing.enabled:
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
try:
dt_headers = MessageTrace.generate_request_headers(transaction)
if dt_headers:
if not headers:
wrapped.request.headers = dict(dt_headers)
else:
headers.update(dict(dt_headers))
wrapped.request.headers = headers
except Exception:
pass
elif transaction.settings.cross_application_tracer.enabled:
transaction._process_incoming_cat_headers(
headers.get(MessageTrace.cat_id_key, None),
headers.get(MessageTrace.cat_transaction_key, None),
)
except Exception:
pass

return wrapped(*args, **kwargs)


def wrap_build_tracer(wrapped, instance, args, kwargs):
class TaskWrapper(FunctionWrapper):
def run(self, *args, **kwargs):
return self.__call__(*args, **kwargs)

try:
bound_args = bind_args(wrapped, args, kwargs)
task = bound_args.get("task", None)

task = TaskWrapper(task, wrap_task_call)
task.__module__ = wrapped.__module__ # Ensure module is set for monkeypatching detection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what this is doing, and the comment is insufficient.

From inspecting execution, task.__module__ = 'celery.app.builtins' and wrapped.__module__ = 'celery.app.trace'.

"Ensure the module is set" doesn't really clarify what it's being set to, and "for monkeypatching detection" is confusing. What is doing the detecting and why is that important?

I disabled this line and the tests pass 100%, so if there's some reason to be doing this we need a test for it.

Copy link
Contributor Author

@lrafeei lrafeei Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining the TaskWrapper where the run() method just calls the existing __call__ method covers the case that the overriding of the __module__ attribute covers. I believe this was a way to optimize functionality.

Using the scenario where the __call__ method has not been overridden and the run() method has been defined:
Without overriding the __module__ attribute, the task tracer will simply use the run() method and bypass the __call__ function. Then, the TaskWrapper class will reroute that back to the __call__ method. When overriding the __module__ attribute, the task tracer will see that the __call__ method has been overridden (which it does by traversing the MRO), and use that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, whoops. That is supposed to be task.__module__ = wrap_task_call.__module__ (so wrapper function instead of wrapped function)

I figured it would be faster execution to keep that line in there, but I don't have strong feelings about removing it either.

bound_args["task"] = task

return wrapped(**bound_args)
except:
# If we can't bind the args, we just call the wrapped function
return wrapped(*args, **kwargs)


def CeleryTaskWrapper(wrapped):
def wrapper(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)
Expand Down Expand Up @@ -103,15 +192,25 @@ def wrapper(wrapped, instance, args, kwargs):
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handler this
# on the request context instead of as custom headers. Handle this
# by defaulting to using vars() if headers is not available
request = instance.request
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
if settings.distributed_tracing.enabled:
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
try:
dt_headers = MessageTrace.generate_request_headers(transaction)
if dt_headers:
if not headers:
instance.request.headers = dict(dt_headers)
else:
headers.update(dict(dt_headers))
instance.request.headers = headers
except Exception:
pass
elif transaction.settings.cross_application_tracer.enabled:
transaction._process_incoming_cat_headers(
headers.get(MessageTrace.cat_id_key, None),
Expand Down Expand Up @@ -159,85 +258,18 @@ def run(self, *args, **kwargs):
return wrapped_task


def instrument_celery_app_task(module):
# Triggered for both 'celery.app.task' and 'celery.task.base'.

if hasattr(module, "BaseTask"):
# Need to add a wrapper for background task entry point.

# In Celery 2.2 the 'BaseTask' class actually resided in the
# module 'celery.task.base'. In Celery 2.3 the 'BaseTask' class
# moved to 'celery.app.task' but an alias to it was retained in
# the module 'celery.task.base'. We need to detect both module
# imports, but we check the module name associated with
# 'BaseTask' to ensure that we do not instrument the class via
# the alias in Celery 2.3 and later.

# In Celery 2.5+, although 'BaseTask' still exists execution of
# the task doesn't pass through it. For Celery 2.5+ need to wrap
# the tracer instead.

if module.BaseTask.__module__ == module.__name__:
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__)


def wrap_Celery_send_task(wrapped, instance, args, kwargs):
transaction = current_transaction()
if not transaction:
return wrapped(*args, **kwargs)

# Merge distributed tracing headers into outgoing task headers
try:
dt_headers = MessageTrace.generate_request_headers(transaction)
original_headers = kwargs.get("headers", None)
if dt_headers:
if not original_headers:
kwargs["headers"] = dict(dt_headers)
else:
kwargs["headers"] = dt_headers = dict(dt_headers)
dt_headers.update(dict(original_headers))
except Exception:
pass

return wrapped(*args, **kwargs)


def wrap_worker_optimizations(wrapped, instance, args, kwargs):
# Attempt to uninstrument BaseTask before stack protection is installed or uninstalled
try:
from celery.app.task import BaseTask

if isinstance(BaseTask.__call__, _NRBoundFunctionWrapper):
BaseTask.__call__ = BaseTask.__call__.__wrapped__
except Exception:
BaseTask = None

# Allow metaprogramming to run
result = wrapped(*args, **kwargs)

# Rewrap finalized BaseTask
if BaseTask: # Ensure imports succeeded
BaseTask.__call__ = CeleryTaskWrapper(BaseTask.__call__)

return result


def instrument_celery_app_base(module):
if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
def instrument_celery_local(module):
if hasattr(module, "Proxy"):
# This is used in the case where the function is
# called directly on the Proxy object (rather than
# using "delay" or "apply_async")
module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__)


def instrument_celery_worker(module):
# Triggered for 'celery.worker' and 'celery.concurrency.processes'.

if hasattr(module, "process_initializer"):
# We try and force registration of default application after
# fork of worker process rather than lazily on first request.

# Originally the 'process_initializer' function was located in
# 'celery.worker'. In Celery 2.5 the function 'process_initializer'
# was moved to the module 'celery.concurrency.processes'.

# We try and force activation of the agent before
# the worker process starts.
_process_initializer = module.process_initializer

@functools.wraps(module.process_initializer)
Expand All @@ -247,6 +279,18 @@ def process_initializer(*args, **kwargs):

module.process_initializer = process_initializer

if hasattr(module, "process_destructor"):
# We try and force shutdown of the agent before
# the worker process exits.
_process_destructor = module.process_destructor

@functools.wraps(module.process_destructor)
def process_destructor(*args, **kwargs):
shutdown_agent()
return _process_destructor(*args, **kwargs)

module.process_destructor = process_destructor


def instrument_celery_loaders_base(module):
def force_application_activation(*args, **kwargs):
Expand All @@ -259,14 +303,10 @@ def instrument_billiard_pool(module):
def force_agent_shutdown(*args, **kwargs):
shutdown_agent()

if hasattr(module, "Worker"):
if hasattr(module, "Worker") and hasattr(module.Worker, "_do_exit"):
wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown)


def instrument_celery_app_trace(module):
# Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers
if hasattr(module, "setup_worker_optimizations"):
wrap_function_wrapper(module, "setup_worker_optimizations", wrap_worker_optimizations)

if hasattr(module, "reset_worker_optimizations"):
wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations)
if hasattr(module, "build_tracer"):
wrap_function_wrapper(module, "build_tracer", wrap_build_tracer)
39 changes: 27 additions & 12 deletions tests/application_celery/_target_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from celery import Celery, shared_task
from testing_support.validators.validate_distributed_trace_accepted import validate_distributed_trace_accepted
from celery import Celery, Task, shared_task

from newrelic.api.transaction import current_transaction

Expand All @@ -27,11 +26,37 @@
)


class CustomCeleryTaskWithSuper(Task):
def __call__(self, *args, **kwargs):
transaction = current_transaction()
if transaction:
transaction.add_custom_attribute("custom_task_attribute", "Called with super")
return super().__call__(*args, **kwargs)


class CustomCeleryTaskWithRun(Task):
def __call__(self, *args, **kwargs):
transaction = current_transaction()
if transaction:
transaction.add_custom_attribute("custom_task_attribute", "Called with run")
return self.run(*args, **kwargs)


Comment on lines +49 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, can we move these (and the tasks that use them) to the bottom of the file so the test app's functionality escalates in complexity rather than starting with the most confusing things?

@app.task
def add(x, y):
return x + y


@app.task(base=CustomCeleryTaskWithSuper)
def add_with_super(x, y):
return x + y


@app.task(base=CustomCeleryTaskWithRun)
def add_with_run(x, y):
return x + y


@app.task
def tsum(nums):
return sum(nums)
Expand All @@ -45,13 +70,3 @@ def nested_add(x, y):
@shared_task
def shared_task_add(x, y):
return x + y


@app.task
@validate_distributed_trace_accepted(transport_type="AMQP")
def assert_dt():
# Basic checks for DT delegated to task
txn = current_transaction()
assert txn, "No transaction active."
assert txn.name == "_target_application.assert_dt", f"Transaction name does not match: {txn.name}"
return 1
Loading
Loading