forked from CenterForOpenScience/SHARE
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelery.py
More file actions
120 lines (96 loc) · 3.72 KB
/
celery.py
File metadata and controls
120 lines (96 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from functools import reduce
import operator
import pprint
import uuid
from celery import states
from django.db import models
from django.contrib import admin
from django.utils.html import format_html
from project import celery_app
from share.admin.util import TimeLimitedPaginator
class TaskNameFilter(admin.SimpleListFilter):
title = 'Task'
parameter_name = 'task_name'
def lookups(self, request, model_admin):
celery_app.autodiscover_tasks([
'share',
'share.janitor',
], force=True)
return sorted((x, x) for x in celery_app.tasks.keys())
def queryset(self, request, queryset):
if self.value():
return queryset.filter(task_name=self.value())
return queryset
class StatusFilter(admin.SimpleListFilter):
title = 'Status'
parameter_name = 'status'
def lookups(self, request, model_admin):
return sorted((x, x.title()) for x in states.ALL_STATES)
def queryset(self, request, queryset):
_value = self.value()
if _value:
return queryset.filter(status=_value.upper())
return queryset
class CeleryTaskResultAdmin(admin.ModelAdmin):
list_display = ('task_id', 'task_name', 'status_', 'source_', 'date_modified', 'date_created', 'share_version')
exclude = ('correlation_id', )
actions = ('retry', )
ordering = ('-date_modified', )
list_filter = (TaskNameFilter, StatusFilter, )
readonly_fields = (
'task_id',
'task_name',
'task_args', 'task_kwargs',
'result', 'traceback',
'meta_',
'date_created', 'date_modified',
'share_version'
)
show_full_result_count = False
paginator = TimeLimitedPaginator
search_fields = ('task_name', )
STATUS_COLORS = {
states.SUCCESS: 'green',
states.FAILURE: 'red',
states.STARTED: 'cyan',
states.RETRY: 'orange',
}
def get_search_results(self, request, queryset, search_term):
try:
return queryset.filter(task_id=uuid.UUID(search_term)), False
except ValueError:
pass
# Overriden because there is no way to opt out of a case insensitive search
search_fields = self.get_search_fields(request)
use_distinct = bool(search_term)
if search_fields and search_term:
orm_lookups = ['{}__startswith'.format(search_field) for search_field in search_fields]
for bit in search_term.split():
or_queries = [models.Q(**{orm_lookup: bit}) for orm_lookup in orm_lookups]
queryset = queryset.filter(reduce(operator.or_, or_queries))
return queryset, use_distinct
def task_args(self, obj):
return obj.meta['args']
def task_kwargs(self, obj):
return pprint.pformat(obj.meta['kwargs'])
def status_(self, obj):
return format_html(
'<span style="font-weight: bold; color: {}">{}</span>',
self.STATUS_COLORS.get(obj.status, 'black'),
obj.status.title()
)
status_.short_description = 'Status' # type: ignore[attr-defined]
def meta_(self, obj):
return pprint.pformat(obj.meta)
meta_.short_description = 'Meta' # type: ignore[attr-defined]
def source_(self, obj):
return obj.meta.get('source_config') or obj.meta.get('source')
source_.short_description = 'Source' # type: ignore[attr-defined]
def retry(self, request, queryset):
for task in queryset:
celery_app.tasks[task.task_name].apply_async(
task.meta.get('args', ()),
task.meta.get('kwargs', {}),
task_id=str(task.task_id)
)
retry.short_description = 'Retry Tasks' # type: ignore[attr-defined]