diff --git a/newrelic/config.py b/newrelic/config.py index 0b2ad73567..879a4790f6 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -4010,18 +4010,13 @@ def _process_module_builtin_defaults(): "instrument_rest_framework_decorators", ) - _process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task") - _process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task") + _process_module_definition("celery.local", "newrelic.hooks.application_celery", "instrument_celery_local") _process_module_definition("celery.app.trace", "newrelic.hooks.application_celery", "instrument_celery_app_trace") _process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker") - _process_module_definition( - "celery.concurrency.processes", "newrelic.hooks.application_celery", "instrument_celery_worker" - ) _process_module_definition( "celery.concurrency.prefork", "newrelic.hooks.application_celery", "instrument_celery_worker" ) - _process_module_definition("celery.app.base", "newrelic.hooks.application_celery", "instrument_celery_app_base") _process_module_definition("billiard.pool", "newrelic.hooks.application_celery", "instrument_billiard_pool") _process_module_definition("flup.server.cgi", "newrelic.hooks.adapter_flup", "instrument_flup_server_cgi") diff --git a/newrelic/hooks/application_celery.py b/newrelic/hooks/application_celery.py index 52c7892d03..fbcfa89a35 100644 --- a/newrelic/hooks/application_celery.py +++ b/newrelic/hooks/application_celery.py @@ -28,7 +28,8 @@ from newrelic.api.message_trace import MessageTrace from newrelic.api.pre_function import wrap_pre_function from newrelic.api.transaction import current_transaction -from newrelic.common.object_wrapper import FunctionWrapper, _NRBoundFunctionWrapper, wrap_function_wrapper +from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper +from newrelic.common.signature import bind_args from newrelic.core.agent import shutdown_agent UNKNOWN_TASK_NAME = "" @@ -62,6 +63,8 @@ def task_info(instance, *args, **kwargs): return task_name, task_source +# ============= +# Celery instrumentation for direct task calls (__call__ or run) def CeleryTaskWrapper(wrapped): def wrapper(wrapped, instance, args, kwargs): @@ -103,15 +106,26 @@ def wrapper(wrapped, instance, args, kwargs): # Attempt to grab distributed tracing headers try: # Headers on earlier versions of Celery may end up as attributes - # on the request context instead of as custom headers. Handler this - # by defaulting to using vars() if headers is not available - request = instance.request + # on the request context instead of as custom headers. Handle this + # by defaulting to using `vars()` if headers is not available + + # If there is no request, the request property will return + # a new instance of `celery.Context()` instead of `None`, so + # this will be handled by accessing the request_stack directly. + request = instance and instance.request_stack and instance.request_stack.top headers = getattr(request, "headers", None) or vars(request) settings = transaction.settings if headers is not None and settings is not None: if settings.distributed_tracing.enabled: - transaction.accept_distributed_trace_headers(headers, transport_type="AMQP") + # Generate DT headers if they do not already exist in the incoming request + if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"): + try: + dt_headers = MessageTrace.generate_request_headers(transaction) + if dt_headers: + headers.update(dict(dt_headers)) + except Exception: + pass elif transaction.settings.cross_application_tracer.enabled: transaction._process_incoming_cat_headers( headers.get(MessageTrace.cat_id_key, None), @@ -139,7 +153,7 @@ def wrapper(wrapped, instance, args, kwargs): # Celery has included a monkey-patching provision which did not perform this # optimization on functions that were monkey-patched. Unfortunately, our # wrappers are too transparent for celery to detect that they've even been - # monky-patched. To circumvent this, we set the __module__ of our wrapped task + # monkey-patched. To circumvent this, we set the __module__ of our wrapped task # to this file which causes celery to properly detect that it has been patched. # # For versions of celery 2.5.3 to 2.5.5 @@ -159,85 +173,112 @@ def run(self, *args, **kwargs): return wrapped_task -def instrument_celery_app_task(module): - # Triggered for both 'celery.app.task' and 'celery.task.base'. +def instrument_celery_local(module): + if hasattr(module, "Proxy"): + # This is used in the case where the function is + # called directly on the Proxy object (rather than + # using `delay` or `apply_async`) + module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__) - if hasattr(module, "BaseTask"): - # Need to add a wrapper for background task entry point. +# ============= - # In Celery 2.2 the 'BaseTask' class actually resided in the - # module 'celery.task.base'. In Celery 2.3 the 'BaseTask' class - # moved to 'celery.app.task' but an alias to it was retained in - # the module 'celery.task.base'. We need to detect both module - # imports, but we check the module name associated with - # 'BaseTask' to ensure that we do not instrument the class via - # the alias in Celery 2.3 and later. +# ============= +# Celery Instrumentation for delay/apply_async/apply: - # In Celery 2.5+, although 'BaseTask' still exists execution of - # the task doesn't pass through it. For Celery 2.5+ need to wrap - # the tracer instead. +def wrap_task_call(wrapped, instance, args, kwargs): + transaction = current_transaction(active_only=False) - if module.BaseTask.__module__ == module.__name__: - module.BaseTask.__call__ = CeleryTaskWrapper(module.BaseTask.__call__) + # Grab task name and source + _name, _source = task_info(wrapped, *args, **kwargs) + # A Celery Task can be called either outside of a transaction, or + # within the context of an existing transaction. There are 3 + # possibilities we need to handle: + # + # 1. In an inactive transaction + # + # If the end_of_transaction() or ignore_transaction() API calls + # have been invoked, this task may be called in the context + # of an inactive transaction. In this case, don't wrap the task + # in any way. Just run the original function. + # + # 2. In an active transaction + # + # Run the original function inside a FunctionTrace. + # + # 3. Outside of a transaction + # + # This is the typical case for a celery Task. Since it's not + # running inside of an existing transaction, we want to create + # a new background transaction for it. -def wrap_Celery_send_task(wrapped, instance, args, kwargs): - transaction = current_transaction() - if not transaction: + if transaction and (transaction.ignore_transaction or transaction.stopped): return wrapped(*args, **kwargs) - # Merge distributed tracing headers into outgoing task headers - try: - dt_headers = MessageTrace.generate_request_headers(transaction) - original_headers = kwargs.get("headers", None) - if dt_headers: - if not original_headers: - kwargs["headers"] = dict(dt_headers) - else: - kwargs["headers"] = dt_headers = dict(dt_headers) - dt_headers.update(dict(original_headers)) - except Exception: - pass - - return wrapped(*args, **kwargs) - - -def wrap_worker_optimizations(wrapped, instance, args, kwargs): - # Attempt to uninstrument BaseTask before stack protection is installed or uninstalled - try: - from celery.app.task import BaseTask + elif transaction: + with FunctionTrace(_name, source=_source): + return wrapped(*args, **kwargs) - if isinstance(BaseTask.__call__, _NRBoundFunctionWrapper): - BaseTask.__call__ = BaseTask.__call__.__wrapped__ - except Exception: - BaseTask = None + else: + with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction: + # Attempt to grab distributed tracing headers + try: + # Headers on earlier versions of Celery may end up as attributes + # on the request context instead of as custom headers. Handle this + # by defaulting to using `vars()` if headers is not available + + # If there is no request, the request property will return + # a new instance of `celery.Context()` instead of `None`, so + # this will be handled by accessing the request_stack directly. + request = wrapped and wrapped.request_stack and wrapped.request_stack.top + headers = getattr(request, "headers", None) or vars(request) + + settings = transaction.settings + if headers is not None and settings is not None: + if settings.distributed_tracing.enabled: + # Generate DT headers if they do not already exist in the incoming request + if not transaction.accept_distributed_trace_headers(headers, transport_type="AMQP"): + try: + dt_headers = MessageTrace.generate_request_headers(transaction) + if dt_headers: + headers.update(dict(dt_headers)) + except Exception: + pass + elif transaction.settings.cross_application_tracer.enabled: + transaction._process_incoming_cat_headers( + headers.get(MessageTrace.cat_id_key, None), + headers.get(MessageTrace.cat_transaction_key, None), + ) + except Exception: + pass - # Allow metaprogramming to run - result = wrapped(*args, **kwargs) + return wrapped(*args, **kwargs) - # Rewrap finalized BaseTask - if BaseTask: # Ensure imports succeeded - BaseTask.__call__ = CeleryTaskWrapper(BaseTask.__call__) - return result +def wrap_build_tracer(wrapped, instance, args, kwargs): + class TaskWrapper(FunctionWrapper): + def run(self, *args, **kwargs): + return self.__call__(*args, **kwargs) + + try: + bound_args = bind_args(wrapped, args, kwargs) + task = bound_args.get("task", None) + task = TaskWrapper(task, wrap_task_call) + # Reset __module__ to be less transparent so celery detects our monkey-patching + task.__module__ = wrap_task_call.__module__ + bound_args["task"] = task -def instrument_celery_app_base(module): - if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"): - wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task) + return wrapped(**bound_args) + except: + # If we can't bind the args, we just call the wrapped function + return wrapped(*args, **kwargs) def instrument_celery_worker(module): - # Triggered for 'celery.worker' and 'celery.concurrency.processes'. - if hasattr(module, "process_initializer"): - # We try and force registration of default application after - # fork of worker process rather than lazily on first request. - - # Originally the 'process_initializer' function was located in - # 'celery.worker'. In Celery 2.5 the function 'process_initializer' - # was moved to the module 'celery.concurrency.processes'. - + # We try and force activation of the agent before + # the worker process starts. _process_initializer = module.process_initializer @functools.wraps(module.process_initializer) @@ -247,6 +288,18 @@ def process_initializer(*args, **kwargs): module.process_initializer = process_initializer + if hasattr(module, "process_destructor"): + # We try and force shutdown of the agent before + # the worker process exits. + _process_destructor = module.process_destructor + + @functools.wraps(module.process_destructor) + def process_destructor(*args, **kwargs): + shutdown_agent() + return _process_destructor(*args, **kwargs) + + module.process_destructor = process_destructor + def instrument_celery_loaders_base(module): def force_application_activation(*args, **kwargs): @@ -259,14 +312,10 @@ def instrument_billiard_pool(module): def force_agent_shutdown(*args, **kwargs): shutdown_agent() - if hasattr(module, "Worker"): + if hasattr(module, "Worker") and hasattr(module.Worker, "_do_exit"): wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown) def instrument_celery_app_trace(module): - # Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers - if hasattr(module, "setup_worker_optimizations"): - wrap_function_wrapper(module, "setup_worker_optimizations", wrap_worker_optimizations) - - if hasattr(module, "reset_worker_optimizations"): - wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations) + if hasattr(module, "build_tracer"): + wrap_function_wrapper(module, "build_tracer", wrap_build_tracer) diff --git a/tests/application_celery/_target_application.py b/tests/application_celery/_target_application.py index c7c8578d18..f8f3566a06 100644 --- a/tests/application_celery/_target_application.py +++ b/tests/application_celery/_target_application.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from celery import Celery, shared_task -from testing_support.validators.validate_distributed_trace_accepted import validate_distributed_trace_accepted +from celery import Celery, Task, shared_task from newrelic.api.transaction import current_transaction @@ -47,11 +46,26 @@ def shared_task_add(x, y): return x + y -@app.task -@validate_distributed_trace_accepted(transport_type="AMQP") -def assert_dt(): - # Basic checks for DT delegated to task - txn = current_transaction() - assert txn, "No transaction active." - assert txn.name == "_target_application.assert_dt", f"Transaction name does not match: {txn.name}" - return 1 +class CustomCeleryTaskWithSuper(Task): + def __call__(self, *args, **kwargs): + transaction = current_transaction() + if transaction: + transaction.add_custom_attribute("custom_task_attribute", "Called with super") + return super().__call__(*args, **kwargs) + +class CustomCeleryTaskWithRun(Task): + def __call__(self, *args, **kwargs): + transaction = current_transaction() + if transaction: + transaction.add_custom_attribute("custom_task_attribute", "Called with run") + return self.run(*args, **kwargs) + + +@app.task(base=CustomCeleryTaskWithSuper) +def add_with_super(x, y): + return x + y + + +@app.task(base=CustomCeleryTaskWithRun) +def add_with_run(x, y): + return x + y \ No newline at end of file diff --git a/tests/application_celery/conftest.py b/tests/application_celery/conftest.py index 5531ef7d7a..3d9e4eda68 100644 --- a/tests/application_celery/conftest.py +++ b/tests/application_celery/conftest.py @@ -11,7 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import pytest +from celery.app.trace import setup_worker_optimizations, reset_worker_optimizations from testing_support.fixtures import collector_agent_registration_fixture, collector_available_fixture _default_settings = { @@ -27,7 +29,6 @@ app_name="Python Agent Test (application_celery)", default_settings=_default_settings ) - @pytest.fixture(scope="session") def celery_config(): # Used by celery pytest plugin to configure Celery instance @@ -43,3 +44,12 @@ def celery_worker_parameters(): @pytest.fixture(scope="session", autouse=True) def celery_worker_available(celery_session_worker): return celery_session_worker + + +@pytest.fixture(scope="session", autouse=True, params=[False, True], ids=["unpatched", "patched"]) +def with_worker_optimizations(request, celery_worker_available): + if request.param: + setup_worker_optimizations(celery_worker_available.app) + + yield request.param + reset_worker_optimizations() diff --git a/tests/application_celery/test_distributed_tracing.py b/tests/application_celery/test_distributed_tracing.py index a1542430b7..4a9d797892 100644 --- a/tests/application_celery/test_distributed_tracing.py +++ b/tests/application_celery/test_distributed_tracing.py @@ -12,52 +12,172 @@ # See the License for the specific language governing permissions and # limitations under the License. -from _target_application import add, assert_dt +import pytest +from _target_application import add from testing_support.fixtures import override_application_settings from testing_support.validators.validate_transaction_count import validate_transaction_count from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics from newrelic.api.background_task import background_task +from newrelic.api.transaction import insert_distributed_trace_headers -@validate_transaction_metrics( - name="_target_application.assert_dt", - group="Celery", - rollup_metrics=[ - ("Supportability/DistributedTrace/AcceptPayload/Success", None), - ("Supportability/TraceContext/Accept/Success", 1), - ], - background_task=True, - index=-2, -) -@validate_transaction_metrics( - name="test_distributed_tracing:test_celery_task_distributed_tracing_enabled", background_task=True -) -@validate_transaction_count(2) -@background_task() -def test_celery_task_distributed_tracing_enabled(): - result = assert_dt.apply_async() - result = result.get() - assert result == 1 - - -@override_application_settings({"distributed_tracing.enabled": False}) -@validate_transaction_metrics( - name="_target_application.add", - group="Celery", - rollup_metrics=[ - ("Supportability/DistributedTrace/AcceptPayload/Success", None), - ("Supportability/TraceContext/Accept/Success", None), # No trace context should be accepted - ], - background_task=True, - index=-2, -) -@validate_transaction_metrics( - name="test_distributed_tracing:test_celery_task_distributed_tracing_disabled", background_task=True -) -@validate_transaction_count(2) -@background_task() -def test_celery_task_distributed_tracing_disabled(): - result = add.apply_async((1, 2)) - result = result.get() - assert result == 3 +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_inside_transaction_delay(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="_target_application.add", + group="Celery", + rollup_metrics=[ + ("Supportability/TraceContext/Accept/Success", 1 if dt_enabled else None), + ("Supportability/TraceContext/TraceParent/Accept/Success", 1 if dt_enabled else None), + ], + background_task=True, + index=-2, + ) + @validate_transaction_metrics( + name="test_distributed_tracing:test_DT_inside_transaction_delay.._test", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None), + ("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None), + ], + background_task=True, + ) + @validate_transaction_count(2) + # One for the background task, one for the Celery task. Runs in different processes. + @background_task() + def _test(): + result = add.delay(1, 2) + result = result.get() + assert result == 3 + + _test() + + +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_outside_transaction_delay(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="_target_application.add", + group="Celery", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None), + ("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None), + ], + background_task=True, + ) + @validate_transaction_count(1) + def _test(): + result = add.delay(1, 2) + result = result.get() + assert result == 3 + + _test() + +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_inside_transaction_apply(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="test_distributed_tracing:test_DT_inside_transaction_apply.._test", + rollup_metrics=[ + ("Function/_target_application.add", 1), + ], + scoped_metrics=[ + ("Function/_target_application.add", 1), + ], + background_task=True, + ) + @validate_transaction_count(1) # In the same process, so only one transaction + @background_task() + def _test(): + result = add.apply((1, 2)) + result = result.get() + assert result == 3 + + _test() + + +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_inside_transaction_apply_with_added_headers(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="test_distributed_tracing:test_DT_inside_transaction_apply_with_added_headers.._test", + rollup_metrics=[ + ("Function/_target_application.add", 1), + ("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None), + ("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None), + ], + scoped_metrics=[ + ("Function/_target_application.add", 1), + ], + background_task=True, + ) + @validate_transaction_count(1) # In the same process, so only one transaction + @background_task() + def _test(): + headers = [] + insert_distributed_trace_headers(headers) + result = add.apply((1, 2), headers=headers) + result = result.get() + assert result == 3 + + _test() + + +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_outside_transaction_apply(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="_target_application.add", + group="Celery", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1 if dt_enabled else None), + ("Supportability/DistributedTrace/CreatePayload/Success", 1 if dt_enabled else None), + ], + background_task=True, + ) + @validate_transaction_count(1) # In the same process, so only one transaction + def _test(): + result = add.apply((1, 2)) + result = result.get() + assert result == 3 + + _test() + + +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_inside_transaction__call__(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="test_distributed_tracing:test_DT_inside_transaction__call__.._test", + rollup_metrics=[ + ("Function/_target_application.add", 1), + ], + scoped_metrics=[ + ("Function/_target_application.add", 1), + ], + background_task=True, + ) + @validate_transaction_count(1) # In the same process, so only one transaction + @background_task() + def _test(): + result = add(1, 2) + assert result == 3 + + _test() + + +@pytest.mark.parametrize("dt_enabled", [True, False]) +def test_DT_outside_transaction__call__(dt_enabled): + @override_application_settings({"distributed_tracing.enabled": dt_enabled}) + @validate_transaction_metrics( + name="_target_application.add", + group="Celery", + background_task=True, + ) + @validate_transaction_count(1) # In the same process, so only one transaction + def _test(): + result = add(1, 2) + assert result == 3 + + _test() diff --git a/tests/application_celery/test_task_methods.py b/tests/application_celery/test_task_methods.py index bf6e86e414..d5103e8b74 100644 --- a/tests/application_celery/test_task_methods.py +++ b/tests/application_celery/test_task_methods.py @@ -12,30 +12,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -import celery -import pytest -from _target_application import add, tsum +from celery import chain, group, chord +from _target_application import add, tsum, add_with_super, add_with_run from testing_support.validators.validate_code_level_metrics import validate_code_level_metrics +from testing_support.validators.validate_custom_parameters import validate_custom_parameters from testing_support.validators.validate_transaction_count import validate_transaction_count from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics - -FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)] - - -@pytest.fixture(scope="module", autouse=True, params=[False, True], ids=["unpatched", "patched"]) -def with_worker_optimizations(request, celery_worker_available): - if request.param: - celery.app.trace.setup_worker_optimizations(celery_worker_available.app) - - yield request.param - celery.app.trace.reset_worker_optimizations() +from testing_support.validators.validate_custom_parameters import validate_custom_parameters +from newrelic.api.background_task import background_task @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_code_level_metrics("_target_application", "add") @@ -51,8 +42,8 @@ def test_celery_task_call(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_code_level_metrics("_target_application", "add") @@ -69,8 +60,8 @@ def test_celery_task_apply(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_code_level_metrics("_target_application", "add") @@ -87,8 +78,8 @@ def test_celery_task_delay(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_code_level_metrics("_target_application", "add") @@ -105,8 +96,8 @@ def test_celery_task_apply_async(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_code_level_metrics("_target_application", "add") @@ -123,8 +114,8 @@ def test_celery_app_send_task(celery_session_app): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_code_level_metrics("_target_application", "add") @@ -141,15 +132,15 @@ def test_celery_task_signature(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, index=-2, ) @@ -168,15 +159,15 @@ def test_celery_task_link(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, index=-2, ) @@ -187,7 +178,7 @@ def test_celery_chain(): """ Executes multiple tasks on worker process and returns an AsyncResult. """ - result = celery.chain(add.s(3, 4), add.s(5))() + result = chain(add.s(3, 4), add.s(5))() result = result.get() assert result == 12 @@ -196,15 +187,15 @@ def test_celery_chain(): @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, index=-2, ) @@ -215,7 +206,7 @@ def test_celery_group(): """ Executes multiple tasks on worker process and returns an AsyncResult. """ - result = celery.group(add.s(3, 4), add.s(1, 2))() + result = group(add.s(3, 4), add.s(1, 2))() result = result.get() assert result == [7, 3] @@ -223,23 +214,23 @@ def test_celery_group(): @validate_transaction_metrics( name="_target_application.tsum", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.tsum", None)], + rollup_metrics=[("Function/_target_application.tsum", None)], background_task=True, ) @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, index=-2, ) @validate_transaction_metrics( name="_target_application.add", group="Celery", - scoped_metrics=FORGONE_TASK_METRICS, - rollup_metrics=FORGONE_TASK_METRICS, + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, index=-3, ) @@ -251,7 +242,7 @@ def test_celery_chord(): """ Executes 2 add tasks, followed by a tsum task on the worker process and returns an AsyncResult. """ - result = celery.chord([add.s(3, 4), add.s(1, 2)])(tsum.s()) + result = chord([add.s(3, 4), add.s(1, 2)])(tsum.s()) result = result.get() assert result == 10 @@ -259,11 +250,11 @@ def test_celery_chord(): @validate_transaction_metrics( name="celery.map/_target_application.tsum", group="Celery", - scoped_metrics=[("Function/_target_application.tsum", 2)], - rollup_metrics=[("Function/_target_application.tsum", 2)], + scoped_metrics=[("Function/_target_application.tsum", None)], + rollup_metrics=[("Function/_target_application.tsum", None)], background_task=True, ) -@validate_code_level_metrics("_target_application", "tsum", count=3) +@validate_code_level_metrics("_target_application", "tsum", count=1) @validate_transaction_count(1) def test_celery_task_map(): """ @@ -277,11 +268,11 @@ def test_celery_task_map(): @validate_transaction_metrics( name="celery.starmap/_target_application.add", group="Celery", - scoped_metrics=[("Function/_target_application.add", 2)], - rollup_metrics=[("Function/_target_application.add", 2)], + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) -@validate_code_level_metrics("_target_application", "add", count=3) +@validate_code_level_metrics("_target_application", "add", count=1) @validate_transaction_count(1) def test_celery_task_starmap(): """ @@ -292,23 +283,24 @@ def test_celery_task_starmap(): assert result == [7, 3] +@validate_transaction_metrics(name="celery.starmap/_target_application.add", group="Celery", background_task=True) @validate_transaction_metrics( name="celery.starmap/_target_application.add", group="Celery", - scoped_metrics=[("Function/_target_application.add", 1)], - rollup_metrics=[("Function/_target_application.add", 1)], + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, ) @validate_transaction_metrics( name="celery.starmap/_target_application.add", group="Celery", - scoped_metrics=[("Function/_target_application.add", 1)], - rollup_metrics=[("Function/_target_application.add", 1)], + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], background_task=True, index=-2, ) -@validate_code_level_metrics("_target_application", "add", count=2) -@validate_code_level_metrics("_target_application", "add", count=2, index=-2) +@validate_code_level_metrics("_target_application", "add", count=1) +@validate_code_level_metrics("_target_application", "add", count=1, index=-2) @validate_transaction_count(2) def test_celery_task_chunks(): """ @@ -317,3 +309,243 @@ def test_celery_task_chunks(): result = add.chunks([(3, 4), (1, 2)], n=1).apply_async() result = result.get() assert result == [[7], [3]] + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with super")], +) +@validate_transaction_metrics( + name="_target_application.add_with_super", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_super", None)], + rollup_metrics=[("Function/_target_application.add_with_super", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_super") +@validate_transaction_count(1) +def test_celery_task_call_custom_super(): + """ + Executes task in local process and returns the result directly. + """ + result = add_with_super(3, 4) + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with super")], +) +@validate_transaction_metrics( + name="_target_application.add_with_super", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_super", None)], + rollup_metrics=[("Function/_target_application.add_with_super", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_super") +@validate_transaction_count(1) +def test_celery_task_apply_custom_super(): + """ + Executes task in local process and returns an EagerResult. + """ + result = add_with_super.apply((3, 4)) + result = result.get() + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with super")], +) +@validate_transaction_metrics( + name="_target_application.add_with_super", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_super", None)], + rollup_metrics=[("Function/_target_application.add_with_super", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_super") +@validate_transaction_count(1) +def test_celery_task_delay_custom_super(): + """ + Executes task on worker process and returns an AsyncResult. + """ + result = add_with_super.delay(3, 4) + result = result.get() + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with super")], +) +@validate_transaction_metrics( + name="_target_application.add_with_super", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_super", None)], + rollup_metrics=[("Function/_target_application.add_with_super", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_super") +@validate_transaction_count(1) +def test_celery_task_apply_async_custom_super(): + """ + Executes task on worker process and returns an AsyncResult. + """ + result = add_with_super.apply_async((3, 4)) + result = result.get() + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with run")], +) +@validate_transaction_metrics( + name="_target_application.add_with_run", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_run", None)], + rollup_metrics=[("Function/_target_application.add_with_run", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_run") +@validate_transaction_count(1) +def test_celery_task_call_custom_run(): + """ + Executes task in local process and returns the result directly. + """ + result = add_with_run(3, 4) + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with run")], +) +@validate_transaction_metrics( + name="_target_application.add_with_run", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_run", None)], + rollup_metrics=[("Function/_target_application.add_with_run", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_run") +@validate_transaction_count(1) +def test_celery_task_apply_custom_run(): + """ + Executes task in local process and returns an EagerResult. + """ + result = add_with_run.apply((3, 4)) + result = result.get() + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with run")], +) +@validate_transaction_metrics( + name="_target_application.add_with_run", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_run", None)], + rollup_metrics=[("Function/_target_application.add_with_run", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_run") +@validate_transaction_count(1) +def test_celery_task_delay_custom_run(): + """ + Executes task on worker process and returns an AsyncResult. + """ + result = add_with_run.delay(3, 4) + result = result.get() + assert result == 7 + + +@validate_custom_parameters( + required_params=[("custom_task_attribute", "Called with run")], +) +@validate_transaction_metrics( + name="_target_application.add_with_run", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_run", None)], + rollup_metrics=[("Function/_target_application.add_with_run", None)], + background_task=True, +) +@validate_code_level_metrics("_target_application", "add_with_run") +@validate_transaction_count(1) +def test_celery_task_apply_async_custom_run(): + """ + Executes task on worker process and returns an AsyncResult. + """ + result = add_with_run.apply_async((3, 4)) + result = result.get() + assert result == 7 + + +@validate_transaction_metrics( + name="_target_application.add", + group="Celery", + scoped_metrics=[("Function/_target_application.add", None)], + rollup_metrics=[("Function/_target_application.add", None)], + background_task=True, +) +@validate_transaction_metrics( + name="_target_application.add_with_run", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_run", None)], + rollup_metrics=[("Function/_target_application.add_with_run", None)], + background_task=True, + index=-2, +) +@validate_code_level_metrics("_target_application", "add") +@validate_code_level_metrics("_target_application", "add_with_run", index=-2) +@validate_transaction_count(2) +def test_celery_task_different_processes(): + """ + Executes two tasks, one in the main process and one in a worker process. + """ + result = add_with_run.delay(3, 4) + result = result.get() + assert result == 7 + + result = add.apply((3, 4)) + result = result.get() + assert result == 7 + + +@validate_transaction_metrics( + name="test_task_methods:test_celery_task_in_transaction_different_processes", + scoped_metrics=[("Function/_target_application.add", 1)], + rollup_metrics=[("Function/_target_application.add", 1)], + background_task=True, +) +@validate_transaction_metrics( + name="_target_application.add_with_run", + group="Celery", + scoped_metrics=[("Function/_target_application.add_with_run", None)], + rollup_metrics=[("Function/_target_application.add_with_run", None)], + background_task=True, + index=-2, +) +@validate_code_level_metrics("_target_application", "add") +@validate_code_level_metrics("_target_application", "add_with_run", index=-2) +@validate_transaction_count(2) +@background_task() +def test_celery_task_in_transaction_different_processes(): + """ + This test demonstrates the limitations of the agent with regards + to multiprocessing, namely when it involves transactions in a main + process and worker processes. + + Two tasks are executed, one in the main process and one in a worker + process. A transaction (background task) is created in the context + of the main process. The worker process will not be aware of this + transaction. + + The result is two transactions: + 1. Main process: Background task transaction with `add` task as a trace + 2. Worker process: `add_with_run` transaction with no additional traces + """ + result = add_with_run.delay(3, 4) + result = result.get() + assert result == 7 + + result = add.apply((3, 4)) + result = result.get() + assert result == 7 + diff --git a/tests/application_celery/test_wrappers.py b/tests/application_celery/test_wrappers.py index 838fea07cd..9c78f7820d 100644 --- a/tests/application_celery/test_wrappers.py +++ b/tests/application_celery/test_wrappers.py @@ -12,33 +12,46 @@ # See the License for the specific language governing permissions and # limitations under the License. -import celery -from _target_application import add +from _target_application import add, add_with_super +from celery.app.trace import setup_worker_optimizations, reset_worker_optimizations from newrelic.common.object_wrapper import _NRBoundFunctionWrapper -FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)] - - -def test_task_wrapping_detection(): +def test_worker_optimizations_preserve_instrumentation(celery_worker_available): """ - Ensure celery detects our monkeypatching properly and will run our instrumentation - on __call__ and runs that instead of micro-optimizing it away to a run() call. + Tests that worker optimizations do not remove New Relic instrumentation. - If this is not working, most other tests in this file will fail as the different ways - of running celery tasks will not all run our instrumentation. - """ - assert celery.app.trace.task_has_custom(add, "__call__") + Previously, New Relic was applying instrumentation hooks to `Task`/`BaseTask`. + Setting up and resetting worker optimizations were removing the instrumentation + so, instrumentation was created to remove the instrumentation, run the worker + optimizations, and then reapply the instrumentation. + The purpose of the worker optimizations is to circumvent an issue where a + custom task class defines `__call__` and also calls `super().__call__`. -def test_worker_optimizations_preserve_instrumentation(celery_worker_available): - is_instrumented = lambda: isinstance(celery.app.task.BaseTask.__call__, _NRBoundFunctionWrapper) + We can ensure that the instrumentation is preserved for the `add_with_super` task. + """ + is_instrumented = lambda: isinstance(add_with_super.__call__, _NRBoundFunctionWrapper) - celery.app.trace.reset_worker_optimizations() + reset_worker_optimizations() assert is_instrumented(), "Instrumentation not initially applied." - celery.app.trace.setup_worker_optimizations(celery_worker_available.app) + setup_worker_optimizations(celery_worker_available.app) assert is_instrumented(), "setup_worker_optimizations removed instrumentation." - celery.app.trace.reset_worker_optimizations() + reset_worker_optimizations() assert is_instrumented(), "reset_worker_optimizations removed instrumentation." + + +def test_task_wrapping_detection(): + """ + Ensure celery detects our monkeypatching properly and will run our instrumentation + on __call__ and runs that instead of micro-optimizing it away to a run() call. + + If this is not working, most other tests in this file will fail as the different ways + of running celery tasks will not all run our instrumentation. + """ + assert isinstance(add.__call__, _NRBoundFunctionWrapper), ( + "Celery task add.__call__ is not wrapped by New Relic instrumentation. " + "This indicates that the monkeypatching of celery has not been applied correctly." + ) diff --git a/tests/framework_azurefunctions/sample_application/function_app.py b/tests/framework_azurefunctions/sample_application/function_app.py index 18f1e44d68..5079c0f9d6 100644 --- a/tests/framework_azurefunctions/sample_application/function_app.py +++ b/tests/framework_azurefunctions/sample_application/function_app.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import azure.functions # noqa: E402; pylint: disable=E0401 +import azure.functions # pylint: disable=E0401 app = azure.functions.FunctionApp(http_auth_level=azure.functions.AuthLevel.ANONYMOUS)