-
Notifications
You must be signed in to change notification settings - Fork 125
Celery (re)instrumentation #1429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦙 MegaLinter status: ✅ SUCCESS
See detailed report in MegaLinter reports |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1429 +/- ##
==========================================
- Coverage 81.37% 81.31% -0.07%
==========================================
Files 208 208
Lines 23566 23585 +19
Branches 3717 3719 +2
==========================================
+ Hits 19176 19177 +1
- Misses 3132 3150 +18
Partials 1258 1258 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
class CustomCeleryTaskWithSuper(Task): | ||
def __call__(self, *args, **kwargs): | ||
transaction = current_transaction() | ||
if transaction: | ||
transaction.add_custom_attribute("custom_task_attribute", "Called with super") | ||
return super().__call__(*args, **kwargs) | ||
|
||
|
||
class CustomCeleryTaskWithRun(Task): | ||
def __call__(self, *args, **kwargs): | ||
transaction = current_transaction() | ||
if transaction: | ||
transaction.add_custom_attribute("custom_task_attribute", "Called with run") | ||
return self.run(*args, **kwargs) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick, can we move these (and the tasks that use them) to the bottom of the file so the test app's functionality escalates in complexity rather than starting with the most confusing things?
newrelic/hooks/application_celery.py
Outdated
task = bound_args.get("task", None) | ||
|
||
task = TaskWrapper(task, wrap_task_call) | ||
task.__module__ = wrapped.__module__ # Ensure module is set for monkeypatching detection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand what this is doing, and the comment is insufficient.
From inspecting execution, task.__module__ = 'celery.app.builtins'
and wrapped.__module__ = 'celery.app.trace'
.
"Ensure the module is set" doesn't really clarify what it's being set to, and "for monkeypatching detection" is confusing. What is doing the detecting and why is that important?
I disabled this line and the tests pass 100%, so if there's some reason to be doing this we need a test for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defining the TaskWrapper where the run()
method just calls the existing __call__
method covers the case that the overriding of the __module__
attribute covers. I believe this was a way to optimize functionality.
Using the scenario where the __call__
method has not been overridden and the run()
method has been defined:
Without overriding the __module__
attribute, the task tracer will simply use the run()
method and bypass the __call__
function. Then, the TaskWrapper
class will reroute that back to the __call__
method. When overriding the __module__
attribute, the task tracer will see that the __call__
method has been overridden (which it does by traversing the MRO), and use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That being said, whoops. That is supposed to be task.__module__ = wrap_task_call.__module__
(so wrapper function instead of wrapped function)
I figured it would be faster execution to keep that line in there, but I don't have strong feelings about removing it either.
_process_module_definition("celery.task.base", "newrelic.hooks.application_celery", "instrument_celery_app_task") | ||
_process_module_definition("celery.app.task", "newrelic.hooks.application_celery", "instrument_celery_app_task") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume with removing these hooks we don't support some older versions of Celery outside our support window, but let's be sure to call that out in the release notes as I'm sure some customers out there use ancient versions of Celery.
result = result.get() | ||
assert result == 3 | ||
@pytest.mark.parametrize("dt_enabled", [True, False]) | ||
def test_celery_task_distributed_tracing_inside_background_task(dt_enabled): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The names of all these tests in this file have gotten so long they don't actually describe what they do. Can we shorten all of these and clarify what we're actually testing? Something like this maybe to trim out as many words as possible.
def test_celery_task_distributed_tracing_inside_background_task(dt_enabled): | |
def test_distributed_tracing_inside_txn_apply_async(dt_enabled): |
# In this case, the background task creating the transaction | ||
# has not generated a distributed trace header, so the Celery | ||
# task will not have a distributed trace header to accept. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment and the test don't make sense to me.
This test is for Celery's .apply()
method which is immediate and local execution of the task no?
So the reason there's no distributed tracing headers to accept is because this isn't a distributed system, and Celery isn't handling the transaction?
We should validate that no DT headers are accepted for this then with forgone metrics by setting it to None
like the above tests do when DT is disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If all of that is true, why is the test after this seeing DT headers and accepting them? Isn't that also local exection? Where is it even getting these headers from to accept?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a separate test for this, where a task is called within a transaction (background task) and insert_distributed_trace_headers()
Basically, delay
/apply_async
create worker processes and apply
/__call__
use the main process to execute. In the case of a task being called in a function that is wrapped with the background_task decorator, any tasks called with delay
/apply_async
will create a worker process to execute the task and will no longer be in the context of the transaction. Conversely, any tasks called with apply
/__call__
will execute in the main process and will be a function trace of the existing transaction.
) | ||
@validate_transaction_count(1) # In the same process, so only one transaction | ||
def _test(): | ||
result = add.apply((1, 2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional thought, can't you call tasks directly like functions? If that's not different execution-wise from calling .apply()
, shouldn't we test that here as well the same as we test .apply()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I meant to add a direct call test as well. The execution path is different from .apply()
even though they both are executed on the main process.
This PR picks new instrumentation points for Celery support to allow for Custom Task classes.