Skip to content

Commit cbaf953

Browse files
committed
delete failed task results eventually
1 parent a6952ab commit cbaf953

File tree

3 files changed

+92
-53
lines changed

3 files changed

+92
-53
lines changed

project/settings.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,17 @@ def split(string, delim):
341341
},
342342
}
343343

344-
CELERY_RESULT_EXPIRES = 60 * 60 * 24 * 3 # 4 days
345344
CELERY_RESULT_BACKEND = 'share.celery:CeleryDatabaseBackend'
345+
CELERY_RESULT_EXPIRES = int(os.environ.get(
346+
'CELERY_RESULT_EXPIRES',
347+
60 * 60 * 24 * 3, # 3 days
348+
))
349+
# only successful tasks get the default expiration (above)
350+
# -- failed tasks kept longer (see `share.celery`)
351+
FAILED_CELERY_RESULT_EXPIRES = int(os.environ.get(
352+
'FAILED_CELERY_RESULT_EXPIRES',
353+
60 * 60 * 24 * 11, # 11 days
354+
))
346355

347356
# Don't reject tasks that were present on a worker when it was killed
348357
CELERY_TASK_REJECT_ON_WORKER_LOST = False

share/celery.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1+
import datetime
12
import functools
23
import logging
34

4-
55
from celery import states
66
from celery.app.task import Context
77
from celery.backends.base import BaseDictBackend
88
from celery.utils.time import maybe_timedelta
99

10+
from django.conf import settings
1011
from django.db import transaction
12+
from django.db.models import Q
1113
from django.utils import timezone
1214

1315
import sentry_sdk
@@ -90,7 +92,10 @@ def _store_result(self, task_id, result, status, traceback=None, request=None, *
9092

9193
@die_on_unhandled
9294
def cleanup(self, expires=None):
93-
TaskResultCleaner(expires or self.expires).clean()
95+
TaskResultCleaner(
96+
success_ttl=(expires or self.expires),
97+
nonsuccess_ttl=settings.FAILED_CELERY_RESULT_EXPIRES,
98+
).clean()
9499

95100
@die_on_unhandled
96101
def _get_task_meta_for(self, task_id):
@@ -111,20 +116,19 @@ class TaskResultCleaner:
111116

112117
TaskModel = CeleryTaskResult
113118

114-
TASK_TTLS = {
115-
}
116-
117-
NO_ARCHIVE = {
118-
}
119-
120-
def __init__(self, expires, bucket=None, delete=True, chunk_size=5000):
121-
self.bucket = bucket
119+
def __init__(self, success_ttl, nonsuccess_ttl=None, delete=True, chunk_size=5000):
122120
self.chunk_size = chunk_size
123121
self.delete = delete
124-
self.expires = expires
122+
self.success_ttl = success_ttl
123+
self.nonsuccess_ttl = nonsuccess_ttl or success_ttl
125124

126-
def get_ttl(self, task_name):
127-
return timezone.now() - maybe_timedelta(self.TASK_TTLS.get(task_name, self.expires))
125+
@property
126+
def success_cutoff(self) -> datetime.datetime:
127+
return timezone.now() - maybe_timedelta(self.success_ttl)
128+
129+
@property
130+
def nonsuccess_cutoff(self) -> datetime.datetime:
131+
return timezone.now() - maybe_timedelta(self.nonsuccess_ttl)
128132

129133
def get_task_names(self):
130134
qs = self.TaskModel.objects.values('task_name').annotate(name=GroupBy('task_name'))
@@ -137,12 +141,15 @@ def get_task_names(self):
137141

138142
def clean(self):
139143
for name in self.get_task_names():
140-
logger.debug('Looking for succeeded %s tasks modified before %s', name, self.get_ttl(name))
141-
142-
queryset = self.TaskModel.objects.filter(
143-
task_name=name,
144-
status=states.SUCCESS,
145-
date_modified__lt=self.get_ttl(name)
144+
success_q = Q(status=states.SUCCESS, date_modified__lt=self.success_cutoff)
145+
nonsuccess_q = (
146+
~Q(status=states.SUCCESS)
147+
& Q(date_modified__lt=self.nonsuccess_cutoff)
148+
)
149+
queryset = (
150+
self.TaskModel.objects
151+
.filter(task_name=name)
152+
.filter(success_q | nonsuccess_q)
146153
)
147154

148155
if not queryset.exists():

tests/share/test_celery.py

Lines changed: 56 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,72 @@
1-
import pytest
2-
import datetime
3-
1+
import contextlib
2+
from datetime import timedelta
43
from unittest import mock
54

5+
import pytest
66
from django.utils import timezone
77

88
from share.celery import TaskResultCleaner, CeleryTaskResult
9-
109
from tests import factories
1110

1211

13-
@pytest.mark.usefixtures('nested_django_db')
14-
class TestResultArchiver:
12+
@contextlib.contextmanager
13+
def long_now(new_now=None):
14+
_now = new_now or timezone.now()
15+
with mock.patch.object(timezone, 'now', return_value=_now):
16+
yield _now
1517

16-
@pytest.fixture(scope='class', autouse=True)
17-
def task_result_data(self, class_scoped_django_db):
18-
return factories.CeleryTaskResultFactory.create_batch(100)
18+
19+
@pytest.mark.django_db
20+
class TestResultCleaner:
1921

2022
def test_delete_false(self):
21-
trc = TaskResultCleaner(datetime.timedelta(weeks=520), delete=False)
23+
factories.CeleryTaskResultFactory.create_batch(10)
24+
trc = TaskResultCleaner(timedelta(weeks=520), delete=False)
2225
assert trc.delete_queryset(CeleryTaskResult.objects.all()) == 0
23-
assert CeleryTaskResult.objects.count() != 0
26+
assert CeleryTaskResult.objects.count() == 10
2427

2528
def test_delete_queryset(self):
26-
trc = TaskResultCleaner(datetime.timedelta(weeks=520))
27-
assert trc.delete_queryset(CeleryTaskResult.objects.all()) == 100
29+
factories.CeleryTaskResultFactory.create_batch(10)
30+
trc = TaskResultCleaner(timedelta(weeks=520))
31+
assert trc.delete_queryset(CeleryTaskResult.objects.all()) == 10
2832
assert CeleryTaskResult.objects.count() == 0
2933

30-
def test_get_ttl_default(self):
31-
trc = TaskResultCleaner(datetime.timedelta(weeks=520))
32-
assert ((timezone.now() - datetime.timedelta(weeks=520)) - trc.get_ttl('non-existant-task')) < datetime.timedelta(seconds=2)
33-
34-
def test_get_ttl(self):
35-
trc = TaskResultCleaner(datetime.timedelta(weeks=520))
36-
trc.TASK_TTLS['existant-task'] = datetime.timedelta(days=1)
37-
assert ((timezone.now() - datetime.timedelta(days=1)) - trc.get_ttl('existant-task')) < datetime.timedelta(seconds=2)
38-
39-
def test_clean(self):
40-
trc = TaskResultCleaner(0, bucket=mock.Mock())
41-
factories.CeleryTaskResultFactory.create_batch(100, status='SUCCESS')
42-
trc.clean()
43-
assert CeleryTaskResult.objects.count() <= 100 # There's an autouse fixture that makes 100
44-
45-
def test_clean_chunksize(self):
46-
trc = TaskResultCleaner(0, bucket=mock.Mock(), chunk_size=1)
47-
factories.CeleryTaskResultFactory.create_batch(100, status='SUCCESS')
48-
trc.clean()
49-
assert CeleryTaskResult.objects.count() <= 100 # There's an autouse fixture that makes 100
34+
def test_success_cutoff(self, settings):
35+
with long_now() as _now:
36+
trc = TaskResultCleaner(timedelta(days=3).total_seconds())
37+
_expected = _now - timedelta(days=3)
38+
assert trc.success_cutoff == _expected
39+
40+
def test_nonsuccess_cutoff(self, settings):
41+
with long_now() as _now:
42+
trc = TaskResultCleaner(
43+
success_ttl=timedelta(days=3),
44+
nonsuccess_ttl=timedelta(days=5),
45+
)
46+
assert trc.success_cutoff == _now - timedelta(days=3)
47+
assert trc.nonsuccess_cutoff == _now - timedelta(days=5)
48+
49+
@pytest.mark.parametrize('batch_size', [1, 1111])
50+
def test_clean(self, batch_size):
51+
with long_now() as _now:
52+
with long_now(_now - timedelta(days=7)):
53+
# all should be deleted:
54+
factories.CeleryTaskResultFactory.create_batch(10, status='SUCCESS')
55+
factories.CeleryTaskResultFactory.create_batch(7, status='FAILED')
56+
with long_now(_now - timedelta(days=4)):
57+
# successes should be deleted:
58+
factories.CeleryTaskResultFactory.create_batch(10, status='SUCCESS')
59+
factories.CeleryTaskResultFactory.create_batch(7, status='FAILED')
60+
# none should be deleted:
61+
factories.CeleryTaskResultFactory.create_batch(10, status='SUCCESS')
62+
factories.CeleryTaskResultFactory.create_batch(7, status='FAILED')
63+
# end setup
64+
assert CeleryTaskResult.objects.count() == 51
65+
trc = TaskResultCleaner(
66+
success_ttl=timedelta(days=3),
67+
nonsuccess_ttl=timedelta(days=5),
68+
chunk_size=batch_size,
69+
)
70+
trc.clean()
71+
assert CeleryTaskResult.objects.filter(status='SUCCESS').count() == 10
72+
assert CeleryTaskResult.objects.exclude(status='SUCCESS').count() == 14

0 commit comments

Comments
 (0)