28
28
from newrelic .api .message_trace import MessageTrace
29
29
from newrelic .api .pre_function import wrap_pre_function
30
30
from newrelic .api .transaction import current_transaction
31
- from newrelic .common .object_wrapper import FunctionWrapper , _NRBoundFunctionWrapper , wrap_function_wrapper
31
+ from newrelic .common .object_wrapper import FunctionWrapper , wrap_function_wrapper
32
32
from newrelic .common .signature import bind_args
33
33
from newrelic .core .agent import shutdown_agent
34
34
35
35
UNKNOWN_TASK_NAME = "<Unknown Task>"
36
36
MAPPING_TASK_NAMES = {"celery.starmap" , "celery.map" }
37
37
38
+
38
39
def task_info (instance , * args , ** kwargs ):
39
40
# Grab the current task, which can be located in either place
40
41
if instance :
@@ -129,22 +130,22 @@ def wrap_task_call(wrapped, instance, args, kwargs):
129
130
except Exception :
130
131
pass
131
132
132
- return wrapped (* args , ** kwargs )
133
+ return wrapped (* args , ** kwargs )
133
134
134
135
135
136
def wrap_build_tracer (wrapped , instance , args , kwargs ):
136
137
class TaskWrapper (FunctionWrapper ):
137
138
def run (self , * args , ** kwargs ):
138
139
return self .__call__ (* args , ** kwargs )
139
-
140
+
140
141
try :
141
142
bound_args = bind_args (wrapped , args , kwargs )
142
143
task = bound_args .get ("task" , None )
143
144
144
145
task = TaskWrapper (task , wrap_task_call )
145
146
task .__module__ = wrapped .__module__ # Ensure module is set for monkeypatching detection
146
147
bound_args ["task" ] = task
147
-
148
+
148
149
return wrapped (** bound_args )
149
150
except :
150
151
# If we can't bind the args, we just call the wrapped function
@@ -259,15 +260,15 @@ def run(self, *args, **kwargs):
259
260
260
261
def instrument_celery_local (module ):
261
262
if hasattr (module , "Proxy" ):
262
- # This is used in the case where the function is
263
- # called directly on the Proxy object (rather than
263
+ # This is used in the case where the function is
264
+ # called directly on the Proxy object (rather than
264
265
# using "delay" or "apply_async")
265
266
module .Proxy .__call__ = CeleryTaskWrapper (module .Proxy .__call__ )
266
267
267
268
268
269
def instrument_celery_worker (module ):
269
270
if hasattr (module , "process_initializer" ):
270
- # We try and force activation of the agent before
271
+ # We try and force activation of the agent before
271
272
# the worker process starts.
272
273
_process_initializer = module .process_initializer
273
274
@@ -277,8 +278,7 @@ def process_initializer(*args, **kwargs):
277
278
return _process_initializer (* args , ** kwargs )
278
279
279
280
module .process_initializer = process_initializer
280
-
281
-
281
+
282
282
if hasattr (module , "process_destructor" ):
283
283
# We try and force shutdown of the agent before
284
284
# the worker process exits.
@@ -305,10 +305,8 @@ def force_agent_shutdown(*args, **kwargs):
305
305
306
306
if hasattr (module , "Worker" ) and hasattr (module .Worker , "_do_exit" ):
307
307
wrap_pre_function (module , "Worker._do_exit" , force_agent_shutdown )
308
-
308
+
309
309
310
310
def instrument_celery_app_trace (module ):
311
311
if hasattr (module , "build_tracer" ):
312
312
wrap_function_wrapper (module , "build_tracer" , wrap_build_tracer )
313
-
314
-
0 commit comments