Skip to content

Commit b7fa0ec

Browse files
committed
Fix remaining tests and add custom task tests
1 parent faabca7 commit b7fa0ec

File tree

5 files changed

+203
-159
lines changed

5 files changed

+203
-159
lines changed

newrelic/hooks/application_celery.py

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -264,51 +264,6 @@ def run(self, *args, **kwargs):
264264
wrapped_task.__module__ = CeleryTaskWrapper.__module__
265265

266266
return wrapped_task
267-
268-
269-
# # This will not work with the current version of Celery
270-
# # This only gets called during the async execution of a task
271-
# # and the task is wrapped later in the process to accomodate
272-
# # custom task classes.
273-
# def wrap_Celery_send_task(wrapped, instance, args, kwargs):
274-
# transaction = current_transaction()
275-
# if not transaction:
276-
# return wrapped(*args, **kwargs)
277-
278-
# # Merge distributed tracing headers into outgoing task headers
279-
# try:
280-
# dt_headers = MessageTrace.generate_request_headers(transaction)
281-
# original_headers = kwargs.get("headers", None)
282-
# if dt_headers:
283-
# if not original_headers:
284-
# kwargs["headers"] = dict(dt_headers)
285-
# else:
286-
# kwargs["headers"] = dt_headers = dict(dt_headers)
287-
# dt_headers.update(dict(original_headers))
288-
# except Exception:
289-
# pass
290-
291-
# return wrapped(*args, **kwargs)
292-
293-
294-
def wrap_worker_optimizations(wrapped, instance, args, kwargs):
295-
# Attempt to uninstrument BaseTask before stack protection is installed or uninstalled
296-
try:
297-
from celery.app.task import BaseTask
298-
299-
if isinstance(BaseTask.__call__, _NRBoundFunctionWrapper):
300-
BaseTask.__call__ = BaseTask.__call__.__wrapped__
301-
except Exception:
302-
BaseTask = None
303-
304-
# Allow metaprogramming to run
305-
result = wrapped(*args, **kwargs)
306-
307-
# Rewrap finalized BaseTask
308-
if BaseTask: # Ensure imports succeeded
309-
BaseTask.__call__ = CeleryTaskWrapper(BaseTask.__call__)
310-
311-
return result
312267

313268

314269
def instrument_celery_local(module):
@@ -319,11 +274,6 @@ def instrument_celery_local(module):
319274
module.Proxy.__call__ = CeleryTaskWrapper(module.Proxy.__call__)
320275

321276

322-
# def instrument_celery_app_base(module):
323-
# if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
324-
# wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
325-
326-
327277
def instrument_celery_worker(module):
328278
if hasattr(module, "process_initializer"):
329279
# We try and force activation of the agent before
@@ -367,13 +317,6 @@ def force_agent_shutdown(*args, **kwargs):
367317

368318

369319
def instrument_celery_app_trace(module):
370-
# Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers
371-
if hasattr(module, "setup_worker_optimizations"):
372-
wrap_function_wrapper(module, "setup_worker_optimizations", wrap_worker_optimizations)
373-
374-
if hasattr(module, "reset_worker_optimizations"):
375-
wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations)
376-
377320
if hasattr(module, "build_tracer"):
378321
wrap_function_wrapper(module, "build_tracer", wrap_build_tracer)
379322

tests/application_celery/_target_application.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from celery import Celery, shared_task
16-
from testing_support.validators.validate_distributed_trace_accepted import validate_distributed_trace_accepted
15+
from celery import Celery, Task, shared_task
1716

1817
from newrelic.api.transaction import current_transaction
1918

@@ -26,12 +25,36 @@
2625
broker_heartbeat=0,
2726
)
2827

28+
class CustomCeleryTaskWithSuper(Task):
29+
def __call__(self, *args, **kwargs):
30+
transaction = current_transaction()
31+
if transaction:
32+
transaction.add_custom_attribute("custom_task_attribute", "Called with super")
33+
return super().__call__(*args, **kwargs)
34+
35+
class CustomCeleryTaskWithRun(Task):
36+
def __call__(self, *args, **kwargs):
37+
transaction = current_transaction()
38+
if transaction:
39+
transaction.add_custom_attribute("custom_task_attribute", "Called with run")
40+
return self.run(*args, **kwargs)
41+
2942

3043
@app.task
3144
def add(x, y):
3245
return x + y
3346

3447

48+
@app.task(base=CustomCeleryTaskWithSuper)
49+
def add_with_super(x, y):
50+
return x + y
51+
52+
53+
@app.task(base=CustomCeleryTaskWithRun)
54+
def add_with_run(x, y):
55+
return x + y
56+
57+
3558
@app.task
3659
def tsum(nums):
3760
return sum(nums)
@@ -45,13 +68,3 @@ def nested_add(x, y):
4568
@shared_task
4669
def shared_task_add(x, y):
4770
return x + y
48-
49-
50-
@app.task
51-
@validate_distributed_trace_accepted(transport_type="AMQP")
52-
def assert_dt():
53-
# Basic checks for DT delegated to task
54-
txn = current_transaction()
55-
assert txn, "No transaction active."
56-
assert txn.name == "_target_application.assert_dt", f"Transaction name does not match: {txn.name}"
57-
return 1

tests/application_celery/test_application.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@
2424
@validate_transaction_metrics(
2525
name="test_application:test_celery_task_as_function_trace",
2626
scoped_metrics=[("Function/_target_application.add", 1)],
27-
# scoped_metrics=[("Function/_target_application.add", 2)],
2827
background_task=True,
2928
)
3029
@validate_code_level_metrics("_target_application", "add")
31-
# @validate_code_level_metrics("_target_application", "add", count=2)
3230
@background_task()
3331
def test_celery_task_as_function_trace():
3432
"""

0 commit comments

Comments
 (0)