Skip to content

Commit 0d80d29

Browse files
heedong-jungheedongsamsung
authored andcommitted
Clean TraceBack to reduce memory leaks for exception task (celery#6024)
* Clean TraceBack to reduce memory leaks * add unit test * add unit test * reject unittest * Patch For Python 2.7 compatibility * update unittest * Register to the garbage collector by explicitly referring to f_locals. * need more check * update code coverage * update Missing unit test * 3.4 -> 3.5 Co-authored-by: heedong.jung <heedong.jung@samsung.com>
1 parent f03ede7 commit 0d80d29

File tree

4 files changed

+135
-8
lines changed

4 files changed

+135
-8
lines changed

celery/app/trace.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,8 @@
8282
"""
8383

8484
log_policy_t = namedtuple(
85-
'log_policy_t', ('format', 'description', 'severity', 'traceback', 'mail'),
85+
'log_policy_t',
86+
('format', 'description', 'severity', 'traceback', 'mail'),
8687
)
8788

8889
log_policy_reject = log_policy_t(LOG_REJECTED, 'rejected', logging.WARN, 1, 1)
@@ -256,6 +257,31 @@ def _log_error(self, task, req, einfo):
256257
extra={'data': context})
257258

258259

260+
def traceback_clear(exc=None):
261+
# Cleared Tb, but einfo still has a reference to Traceback.
262+
# exc cleans up the Traceback at the last moment that can be revealed.
263+
tb = None
264+
if exc is not None:
265+
if hasattr(exc, '__traceback__'):
266+
tb = exc.__traceback__
267+
else:
268+
_, _, tb = sys.exc_info()
269+
else:
270+
_, _, tb = sys.exc_info()
271+
272+
if sys.version_info >= (3, 5, 0):
273+
while tb is not None:
274+
try:
275+
tb.tb_frame.clear()
276+
tb.tb_frame.f_locals
277+
except RuntimeError:
278+
# Ignore the exception raised if the frame is still executing.
279+
pass
280+
tb = tb.tb_next
281+
282+
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
283+
sys.exc_clear()
284+
259285
def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
260286
Info=TraceInfo, eager=False, propagate=False, app=None,
261287
monotonic=monotonic, trace_ok_t=trace_ok_t,
@@ -385,16 +411,20 @@ def trace_task(uuid, args, kwargs, request=None):
385411
I, R = Info(REJECTED, exc), ExceptionInfo(internal=True)
386412
state, retval = I.state, I.retval
387413
I.handle_reject(task, task_request)
414+
traceback_clear(exc)
388415
except Ignore as exc:
389416
I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
390417
state, retval = I.state, I.retval
391418
I.handle_ignore(task, task_request)
419+
traceback_clear(exc)
392420
except Retry as exc:
393421
I, R, state, retval = on_error(
394422
task_request, exc, uuid, RETRY, call_errbacks=False)
423+
traceback_clear(exc)
395424
except Exception as exc:
396425
I, R, state, retval = on_error(task_request, exc, uuid)
397-
except BaseException as exc:
426+
traceback_clear(exc)
427+
except BaseException:
398428
raise
399429
else:
400430
try:

celery/backends/base.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,19 @@ def fail_from_current_stack(self, task_id, exc=None):
229229
self.mark_as_failure(task_id, exc, ei.traceback)
230230
return ei
231231
finally:
232+
if sys.version_info >= (3, 5, 0):
233+
while tb is not None:
234+
try:
235+
tb.tb_frame.clear()
236+
tb.tb_frame.f_locals
237+
except RuntimeError:
238+
# Ignore the exception raised if the frame is still executing.
239+
pass
240+
tb = tb.tb_next
241+
242+
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
243+
sys.exc_clear()
244+
232245
del tb
233246

234247
def prepare_exception(self, exc, serializer=None):

t/unit/backends/test_base.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,22 @@ def test_reload_task_result(self):
285285
self.b.reload_task_result('task-exists')
286286
self.b._cache['task-exists'] = {'result': 'task'}
287287

288+
288289
def test_fail_from_current_stack(self):
290+
import inspect
289291
self.b.mark_as_failure = Mock()
290-
try:
292+
frame_list = []
293+
294+
if (2, 7, 0) <= sys.version_info < (3, 0, 0):
295+
sys.exc_clear = Mock()
296+
297+
def raise_dummy():
298+
frame_str_temp = str(inspect.currentframe().__repr__)
299+
frame_list.append(frame_str_temp)
300+
local_value = 1214
291301
raise KeyError('foo')
302+
try:
303+
raise_dummy()
292304
except KeyError as exc:
293305
self.b.fail_from_current_stack('task_id')
294306
self.b.mark_as_failure.assert_called()
@@ -297,6 +309,16 @@ def test_fail_from_current_stack(self):
297309
assert args[1] is exc
298310
assert args[2]
299311

312+
if sys.version_info >= (3, 5, 0):
313+
tb_ = exc.__traceback__
314+
while tb_ is not None:
315+
if str(tb_.tb_frame.__repr__) == frame_list[0]:
316+
assert len(tb_.tb_frame.f_locals) == 0
317+
tb_ = tb_.tb_next
318+
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
319+
sys.exc_clear.assert_called()
320+
321+
300322
def test_prepare_value_serializes_group_result(self):
301323
self.b.serializer = 'json'
302324
g = self.app.GroupResult('group_id', [self.app.AsyncResult('foo')])

t/unit/tasks/test_trace.py

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
log_policy_internal, log_policy_reject,
1313
log_policy_unexpected,
1414
reset_worker_optimizations,
15-
setup_worker_optimizations, trace_task)
15+
setup_worker_optimizations, trace_task,
16+
traceback_clear)
17+
1618
from celery.exceptions import Ignore, Reject, Retry
1719

1820

@@ -150,23 +152,79 @@ def add(x, y):
150152
with pytest.raises(MemoryError):
151153
self.trace(add, (2, 2), {}, eager=False)
152154

153-
def test_when_Ignore(self):
155+
def test_traceback_clear(self):
156+
import inspect, sys
157+
sys.exc_clear = Mock()
158+
frame_list =[]
159+
160+
def raise_dummy():
161+
frame_str_temp = str(inspect.currentframe().__repr__)
162+
frame_list.append(frame_str_temp)
163+
local_value = 1214
164+
raise KeyError('foo')
165+
try:
166+
raise_dummy()
167+
except KeyError as exc:
168+
traceback_clear(exc)
169+
170+
if sys.version_info >= (3, 5, 0):
171+
tb_ = exc.__traceback__
172+
while tb_ is not None:
173+
if str(tb_.tb_frame.__repr__) == frame_list[0]:
174+
assert len(tb_.tb_frame.f_locals) == 0
175+
tb_ = tb_.tb_next
176+
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
177+
sys.exc_clear.assert_called()
178+
179+
try:
180+
raise_dummy()
181+
except KeyError as exc:
182+
traceback_clear()
183+
184+
if sys.version_info >= (3, 5, 0):
185+
tb_ = exc.__traceback__
186+
while tb_ is not None:
187+
if str(tb_.tb_frame.__repr__) == frame_list[0]:
188+
assert len(tb_.tb_frame.f_locals) == 0
189+
tb_ = tb_.tb_next
190+
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
191+
sys.exc_clear.assert_called()
192+
193+
try:
194+
raise_dummy()
195+
except KeyError as exc:
196+
traceback_clear(str(exc))
197+
198+
if sys.version_info >= (3, 5, 0):
199+
tb_ = exc.__traceback__
200+
while tb_ is not None:
201+
if str(tb_.tb_frame.__repr__) == frame_list[0]:
202+
assert len(tb_.tb_frame.f_locals) == 0
203+
tb_ = tb_.tb_next
204+
elif (2, 7, 0) <= sys.version_info < (3, 0, 0):
205+
sys.exc_clear.assert_called()
206+
207+
@patch('celery.app.trace.traceback_clear')
208+
def test_when_Ignore(self, mock_traceback_clear):
154209

155210
@self.app.task(shared=False)
156211
def ignored():
157212
raise Ignore()
158213

159214
retval, info = self.trace(ignored, (), {})
160215
assert info.state == states.IGNORED
216+
mock_traceback_clear.assert_called()
161217

162-
def test_when_Reject(self):
218+
@patch('celery.app.trace.traceback_clear')
219+
def test_when_Reject(self, mock_traceback_clear):
163220

164221
@self.app.task(shared=False)
165222
def rejecting():
166223
raise Reject()
167224

168225
retval, info = self.trace(rejecting, (), {})
169226
assert info.state == states.REJECTED
227+
mock_traceback_clear.assert_called()
170228

171229
def test_backend_cleanup_raises(self):
172230
self.add.backend.process_cleanup = Mock()
@@ -248,17 +306,21 @@ def test_trace_SystemExit(self):
248306
with pytest.raises(SystemExit):
249307
self.trace(self.raises, (SystemExit(),), {})
250308

251-
def test_trace_Retry(self):
309+
@patch('celery.app.trace.traceback_clear')
310+
def test_trace_Retry(self, mock_traceback_clear):
252311
exc = Retry('foo', 'bar')
253312
_, info = self.trace(self.raises, (exc,), {})
254313
assert info.state == states.RETRY
255314
assert info.retval is exc
315+
mock_traceback_clear.assert_called()
256316

257-
def test_trace_exception(self):
317+
@patch('celery.app.trace.traceback_clear')
318+
def test_trace_exception(self, mock_traceback_clear):
258319
exc = KeyError('foo')
259320
_, info = self.trace(self.raises, (exc,), {})
260321
assert info.state == states.FAILURE
261322
assert info.retval is exc
323+
mock_traceback_clear.assert_called()
262324

263325
def test_trace_task_ret__no_content_type(self):
264326
_trace_task_ret(

0 commit comments

Comments
 (0)