Skip to content

Celery (re)instrumentation #1429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1c0166f
Add support for custom task classes
lrafeei Jul 18, 2025
926ee94
Initial commit
lrafeei Jul 22, 2025
faabca7
Distributed Tracing tests
lrafeei Jul 23, 2025
b7fa0ec
Fix remaining tests and add custom task tests
lrafeei Jul 23, 2025
a4d8232
Merge branch 'main' into celery-custom-task-instrumentation
lrafeei Jul 23, 2025
27cb3a9
Clean up comments in file
lrafeei Jul 23, 2025
ad58275
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 23, 2025
e7113a3
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 23, 2025
5c6bda0
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 28, 2025
0a58054
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 28, 2025
041cccb
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 28, 2025
1c7df3c
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Jul 30, 2025
e296bd0
Ruff format
TimPansino Jul 30, 2025
b792b55
Fix comment typo
TimPansino Aug 1, 2025
1052d28
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 6, 2025
9de2530
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 6, 2025
c77a0a2
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 6, 2025
088fdb1
Reviewer suggestions and more tests
lrafeei Aug 8, 2025
11cbdbf
Clean up commented code
lrafeei Aug 8, 2025
7f2493c
Add comments and shuffle code
lrafeei Aug 8, 2025
d908ba6
Fix celery linter errors
lrafeei Aug 8, 2025
17954ab
Fix azure linter errors
lrafeei Aug 8, 2025
ba90b90
Fix linter error
lrafeei Aug 8, 2025
7634008
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 8, 2025
fbd50a0
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 12, 2025
e3b9bef
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 13, 2025
01b56fb
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 13, 2025
b2b19b3
Merge branch 'main' into celery-custom-task-instrumentation
mergify[bot] Aug 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4010,18 +4010,13 @@ def _process_module_builtin_defaults():
"instrument_rest_framework_decorators",
)

_process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task")
_process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task")
Comment on lines -4013 to -4014
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume with removing these hooks we don't support some older versions of Celery outside our support window, but let's be sure to call that out in the release notes as I'm sure some customers out there use ancient versions of Celery.

_process_module_definition("celery.local", "newrelic.hooks.application_celery", "instrument_celery_local")
_process_module_definition("celery.app.trace", "newrelic.hooks.application_celery", "instrument_celery_app_trace")
_process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker")
_process_module_definition(
"celery.concurrency.processes", "newrelic.hooks.application_celery", "instrument_celery_worker"
)
_process_module_definition(
"celery.concurrency.prefork", "newrelic.hooks.application_celery", "instrument_celery_worker"
)

_process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base")
_process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool")

_process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi")
Expand Down
197 changes: 123 additions & 74 deletions newrelic/hooks/application_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from newrelic.api.message_trace import MessageTrace
from newrelic.api.pre_function import wrap_pre_function
from newrelic.api.transaction import current_transaction
from newrelic.common.object_wrapper import FunctionWrapper, _NRBoundFunctionWrapper, wrap_function_wrapper
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper
from newrelic.common.signature import bind_args
from newrelic.core.agent import shutdown_agent

UNKNOWN_TASK_NAME = "<Unknown Task>"
Expand Down Expand Up @@ -62,6 +63,8 @@ def task_info(instance, *args, **kwargs):

return task_name, task_source

# =============
# Celery instrumentation for direct task calls (__call__ or run)

def CeleryTaskWrapper(wrapped):
def wrapper(wrapped, instance, args, kwargs):
Expand Down Expand Up @@ -103,15 +106,26 @@ def wrapper(wrapped, instance, args, kwargs):
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handler this
# by defaulting to using vars() if headers is not available
request = instance.request
# on the request context instead of as custom headers. Handle this
# by defaulting to using `vars()` if headers is not available

# If there is no request, the request property will return
# a new instance of `celery.Context()` instead of `None`, so
# this will be handled by accessing the request_stack directly.
request = instance and instance.request_stack and instance.request_stack.top
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
if settings.distributed_tracing.enabled:
transaction.accept_distributed_trace_headers(headers, transport_type="AMQP")
# Generate DT headers if they do not already exist in the incoming request
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
try:
dt_headers = MessageTrace.generate_request_headers(transaction)
if dt_headers:
headers.update(dict(dt_headers))
except Exception:
pass
elif transaction.settings.cross_application_tracer.enabled:
transaction._process_incoming_cat_headers(
headers.get(MessageTrace.cat_id_key, None),
Expand Down Expand Up @@ -139,7 +153,7 @@ def wrapper(wrapped, instance, args, kwargs):
# Celery has included a monkey-patching provision which did not perform this
# optimization on functions that were monkey-patched. Unfortunately, our
# wrappers are too transparent for celery to detect that they've even been
# monky-patched. To circumvent this, we set the __module__ of our wrapped task
# monkey-patched. To circumvent this, we set the __module__ of our wrapped task
# to this file which causes celery to properly detect that it has been patched.
#
# For versions of celery 2.5.3 to 2.5.5
Expand All @@ -159,85 +173,112 @@ def run(self, *args, **kwargs):
return wrapped_task


def instrument_celery_app_task(module):
# Triggered for both 'celery.app.task' and 'celery.task.base'.
def instrument_celery_local(module):
if hasattr(module, "Proxy"):
# This is used in the case where the function is
# called directly on the Proxy object (rather than
# using `delay` or `apply_async`)
module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__)

if hasattr(module, "BaseTask"):
# Need to add a wrapper for background task entry point.
# =============

# In Celery 2.2 the 'BaseTask' class actually resided in the
# module 'celery.task.base'. In Celery 2.3 the 'BaseTask' class
# moved to 'celery.app.task' but an alias to it was retained in
# the module 'celery.task.base'. We need to detect both module
# imports, but we check the module name associated with
# 'BaseTask' to ensure that we do not instrument the class via
# the alias in Celery 2.3 and later.
# =============
# Celery Instrumentation for delay/apply_async/apply:

# In Celery 2.5+, although 'BaseTask' still exists execution of
# the task doesn't pass through it. For Celery 2.5+ need to wrap
# the tracer instead.
def wrap_task_call(wrapped, instance, args, kwargs):
transaction = current_transaction(active_only=False)

if module.BaseTask.__module__ == module.__name__:
module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__)
# Grab task name and source
_name, _source = task_info(wrapped, *args, **kwargs)

# A Celery Task can be called either outside of a transaction, or
# within the context of an existing transaction. There are 3
# possibilities we need to handle:
#
# 1. In an inactive transaction
#
# If the end_of_transaction() or ignore_transaction() API calls
# have been invoked, this task may be called in the context
# of an inactive transaction. In this case, don't wrap the task
# in any way. Just run the original function.
#
# 2. In an active transaction
#
# Run the original function inside a FunctionTrace.
#
# 3. Outside of a transaction
#
# This is the typical case for a celery Task. Since it's not
# running inside of an existing transaction, we want to create
# a new background transaction for it.

def wrap_Celery_send_task(wrapped, instance, args, kwargs):
transaction = current_transaction()
if not transaction:
if transaction and (transaction.ignore_transaction or transaction.stopped):
return wrapped(*args, **kwargs)

# Merge distributed tracing headers into outgoing task headers
try:
dt_headers = MessageTrace.generate_request_headers(transaction)
original_headers = kwargs.get("headers", None)
if dt_headers:
if not original_headers:
kwargs["headers"] = dict(dt_headers)
else:
kwargs["headers"] = dt_headers = dict(dt_headers)
dt_headers.update(dict(original_headers))
except Exception:
pass

return wrapped(*args, **kwargs)


def wrap_worker_optimizations(wrapped, instance, args, kwargs):
# Attempt to uninstrument BaseTask before stack protection is installed or uninstalled
try:
from celery.app.task import BaseTask
elif transaction:
with FunctionTrace(_name, source=_source):
return wrapped(*args, **kwargs)

if isinstance(BaseTask.__call__, _NRBoundFunctionWrapper):
BaseTask.__call__ = BaseTask.__call__.__wrapped__
except Exception:
BaseTask = None
else:
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
# Attempt to grab distributed tracing headers
try:
# Headers on earlier versions of Celery may end up as attributes
# on the request context instead of as custom headers. Handle this
# by defaulting to using `vars()` if headers is not available

# If there is no request, the request property will return
# a new instance of `celery.Context()` instead of `None`, so
# this will be handled by accessing the request_stack directly.
request = wrapped and wrapped.request_stack and wrapped.request_stack.top
headers = getattr(request, "headers", None) or vars(request)

settings = transaction.settings
if headers is not None and settings is not None:
if settings.distributed_tracing.enabled:
# Generate DT headers if they do not already exist in the incoming request
if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"):
try:
dt_headers = MessageTrace.generate_request_headers(transaction)
if dt_headers:
headers.update(dict(dt_headers))
except Exception:
pass
elif transaction.settings.cross_application_tracer.enabled:
transaction._process_incoming_cat_headers(
headers.get(MessageTrace.cat_id_key, None),
headers.get(MessageTrace.cat_transaction_key, None),
)
except Exception:
pass

# Allow metaprogramming to run
result = wrapped(*args, **kwargs)
return wrapped(*args, **kwargs)

# Rewrap finalized BaseTask
if BaseTask: # Ensure imports succeeded
BaseTask.__call__ = CeleryTaskWrapper(BaseTask.__call__)

return result
def wrap_build_tracer(wrapped, instance, args, kwargs):
class TaskWrapper(FunctionWrapper):
def run(self, *args, **kwargs):
return self.__call__(*args, **kwargs)

try:
bound_args = bind_args(wrapped, args, kwargs)
task = bound_args.get("task", None)

task = TaskWrapper(task, wrap_task_call)
# Reset __module__ to be less transparent so celery detects our monkey-patching
task.__module__ = wrap_task_call.__module__
bound_args["task"] = task

def instrument_celery_app_base(module):
if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
return wrapped(**bound_args)
except:
# If we can't bind the args, we just call the wrapped function
return wrapped(*args, **kwargs)


def instrument_celery_worker(module):
# Triggered for 'celery.worker' and 'celery.concurrency.processes'.

if hasattr(module, "process_initializer"):
# We try and force registration of default application after
# fork of worker process rather than lazily on first request.

# Originally the 'process_initializer' function was located in
# 'celery.worker'. In Celery 2.5 the function 'process_initializer'
# was moved to the module 'celery.concurrency.processes'.

# We try and force activation of the agent before
# the worker process starts.
_process_initializer = module.process_initializer

@functools.wraps(module.process_initializer)
Expand All @@ -247,6 +288,18 @@ def process_initializer(*args, **kwargs):

module.process_initializer = process_initializer

if hasattr(module, "process_destructor"):
# We try and force shutdown of the agent before
# the worker process exits.
_process_destructor = module.process_destructor

@functools.wraps(module.process_destructor)
def process_destructor(*args, **kwargs):
shutdown_agent()
return _process_destructor(*args, **kwargs)

module.process_destructor = process_destructor


def instrument_celery_loaders_base(module):
def force_application_activation(*args, **kwargs):
Expand All @@ -259,14 +312,10 @@ def instrument_billiard_pool(module):
def force_agent_shutdown(*args, **kwargs):
shutdown_agent()

if hasattr(module, "Worker"):
if hasattr(module, "Worker") and hasattr(module.Worker, "_do_exit"):
wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown)


def instrument_celery_app_trace(module):
# Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers
if hasattr(module, "setup_worker_optimizations"):
wrap_function_wrapper(module, "setup_worker_optimizations", wrap_worker_optimizations)

if hasattr(module, "reset_worker_optimizations"):
wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations)
if hasattr(module, "build_tracer"):
wrap_function_wrapper(module, "build_tracer", wrap_build_tracer)
34 changes: 24 additions & 10 deletions tests/application_celery/_target_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from celery import Celery, shared_task
from testing_support.validators.validate_distributed_trace_accepted import validate_distributed_trace_accepted
from celery import Celery, Task, shared_task

from newrelic.api.transaction import current_transaction

Expand Down Expand Up @@ -47,11 +46,26 @@ def shared_task_add(x, y):
return x + y


@app.task
@validate_distributed_trace_accepted(transport_type="AMQP")
def assert_dt():
# Basic checks for DT delegated to task
txn = current_transaction()
assert txn, "No transaction active."
assert txn.name == "_target_application.assert_dt", f"Transaction name does not match: {txn.name}"
return 1
class CustomCeleryTaskWithSuper(Task):
def __call__(self, *args, **kwargs):
transaction = current_transaction()
if transaction:
transaction.add_custom_attribute("custom_task_attribute", "Called with super")
return super().__call__(*args, **kwargs)

class CustomCeleryTaskWithRun(Task):
def __call__(self, *args, **kwargs):
transaction = current_transaction()
if transaction:
transaction.add_custom_attribute("custom_task_attribute", "Called with run")
return self.run(*args, **kwargs)


@app.task(base=CustomCeleryTaskWithSuper)
def add_with_super(x, y):
return x + y


@app.task(base=CustomCeleryTaskWithRun)
def add_with_run(x, y):
return x + y
12 changes: 11 additions & 1 deletion tests/application_celery/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from celery.app.trace import setup_worker_optimizations, reset_worker_optimizations
from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture

_default_settings = {
Expand All @@ -27,7 +29,6 @@
app_name="Python Agent Test (application_celery)", default_settings=_default_settings
)


@pytest.fixture(scope="session")
def celery_config():
# Used by celery pytest plugin to configure Celery instance
Expand All @@ -43,3 +44,12 @@ def celery_worker_parameters():
@pytest.fixture(scope="session", autouse=True)
def celery_worker_available(celery_session_worker):
return celery_session_worker


@pytest.fixture(scope="session", autouse=True, params=[False, True], ids=["unpatched", "patched"])
def with_worker_optimizations(request, celery_worker_available):
if request.param:
setup_worker_optimizations(celery_worker_available.app)

yield request.param
reset_worker_optimizations()
Loading
Loading