Skip to content

Commit 1dfcd7a

Browse files
authored
Update the celery task result table creation logic (#783)
* Update the celery task result table creation logic * Disable beat_sync_every config * Update the prepared comment
1 parent 2b28244 commit 1dfcd7a

File tree

7 files changed

+304
-17
lines changed

7 files changed

+304
-17
lines changed

backend/app/task/celery.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import celery
66
import celery_aio_pool
77

8-
from backend.app.task.model.result import OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME, OVERWRITE_CELERY_RESULT_TABLE_NAME
98
from backend.app.task.tasks.beat import LOCAL_BEAT_SCHEDULE
109
from backend.core.conf import settings
1110
from backend.core.path_conf import BASE_PATH
@@ -30,22 +29,19 @@ def init_celery() -> celery.Celery:
3029
celery.app.trace.build_tracer = celery_aio_pool.build_async_tracer
3130
celery.app.trace.reset_worker_optimizations()
3231

32+
# https://docs.celeryq.dev/en/stable/userguide/configuration.html
3333
app = celery.Celery(
3434
'fba_celery',
35-
broker=f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}'
35+
broker_url=f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}'
3636
if settings.CELERY_BROKER == 'redis'
3737
else f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}',
3838
broker_connection_retry_on_startup=True,
39-
backend=f'db+{settings.DATABASE_TYPE}+{"pymysql" if settings.DATABASE_TYPE == "mysql" else "psycopg"}'
39+
result_backend=f'db+{settings.DATABASE_TYPE}+{"pymysql" if settings.DATABASE_TYPE == "mysql" else "psycopg"}'
4040
f'://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_SCHEMA}',
41-
database_engine_options={'echo': settings.DATABASE_ECHO},
42-
database_table_names={
43-
'task': OVERWRITE_CELERY_RESULT_TABLE_NAME,
44-
'group': OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME,
45-
},
4641
result_extended=True,
47-
# result_expires=0, # 清理任务结果,默认每天凌晨 4 点,0 或 None 表示不清理
48-
# beat_sync_every=1, # 保存任务状态周期,默认 3 * 60 秒
42+
database_engine_options={'echo': settings.DATABASE_ECHO},
43+
# result_expires=0,
44+
# beat_sync_every=1,
4945
beat_schedule=LOCAL_BEAT_SCHEDULE,
5046
beat_scheduler='backend.app.task.utils.schedulers:DatabaseScheduler',
5147
task_cls='backend.app.task.tasks.base:TaskBase',
@@ -54,6 +50,10 @@ def init_celery() -> celery.Celery:
5450
timezone=settings.DATETIME_TIMEZONE,
5551
)
5652

53+
# 在 Celery 中设置此参数无效
54+
# 参数:https://github.com/celery/celery/issues/7270
55+
app.loader.override_backends = {'db': 'backend.app.task.database:DatabaseBackend'}
56+
5757
# 自动发现任务
5858
packages = find_task_packages()
5959
app.autodiscover_tasks(packages)

backend/app/task/crud/crud_result.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from sqlalchemy.ext.asyncio import AsyncSession
55
from sqlalchemy_crud_plus import CRUDPlus
66

7-
from backend.app.task.model.result import TaskResult
7+
from backend.app.task.model import TaskResult
88

99

1010
class CRUDTaskResult(CRUDPlus[TaskResult]):

backend/app/task/database.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
from celery import states
4+
from celery.backends.base import BaseBackend
5+
from celery.backends.database import retry, session_cleanup
6+
from celery.exceptions import ImproperlyConfigured
7+
from celery.utils.time import maybe_timedelta
8+
9+
from backend.app.task.model.result import Task, TaskExtended, TaskSet
10+
from backend.app.task.session import SessionManager
11+
12+
"""
13+
重写 from celery.backends.database 内部 DatabaseBackend 类,此类实现与模型配合不佳,导致 fba 创建表和 alembic 迁移困难
14+
"""
15+
16+
17+
class DatabaseBackend(BaseBackend):
18+
"""The database result backend."""
19+
20+
# ResultSet.iterate should sleep this much between each pool,
21+
# to not bombard the database with queries.
22+
subpolling_interval = 0.5
23+
24+
task_cls = Task
25+
taskset_cls = TaskSet
26+
27+
def __init__(self, dburi=None, engine_options=None, url=None, **kwargs):
28+
# The `url` argument was added later and is used by
29+
# the app to set backend by url (celery.app.backends.by_url)
30+
super().__init__(expires_type=maybe_timedelta, url=url, **kwargs)
31+
conf = self.app.conf
32+
33+
if self.extended_result:
34+
self.task_cls = TaskExtended
35+
36+
self.url = url or dburi or conf.database_url
37+
self.engine_options = dict(engine_options or {}, **conf.database_engine_options or {})
38+
self.short_lived_sessions = kwargs.get('short_lived_sessions', conf.database_short_lived_sessions)
39+
40+
schemas = conf.database_table_schemas or {}
41+
tablenames = conf.database_table_names or {}
42+
self.task_cls.configure(schema=schemas.get('task'), name=tablenames.get('task'))
43+
self.taskset_cls.configure(schema=schemas.get('group'), name=tablenames.get('group'))
44+
45+
if not self.url:
46+
raise ImproperlyConfigured(
47+
'Missing connection string! Do you have the database_url setting set to a real value?'
48+
)
49+
50+
self.session_manager = SessionManager()
51+
52+
create_tables_at_setup = conf.database_create_tables_at_setup
53+
if create_tables_at_setup is True:
54+
self._create_tables()
55+
56+
@property
57+
def extended_result(self):
58+
return self.app.conf.find_value_for_key('extended', 'result')
59+
60+
def _create_tables(self):
61+
"""Create the task and taskset tables."""
62+
self.ResultSession()
63+
64+
def ResultSession(self, session_manager=None):
65+
if session_manager is None:
66+
session_manager = self.session_manager
67+
return session_manager.session_factory(
68+
dburi=self.url, short_lived_sessions=self.short_lived_sessions, **self.engine_options
69+
)
70+
71+
@retry
72+
def _store_result(self, task_id, result, state, traceback=None, request=None, **kwargs):
73+
"""Store return value and state of an executed task."""
74+
session = self.ResultSession()
75+
with session_cleanup(session):
76+
task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
77+
task = task and task[0]
78+
if not task:
79+
task = self.task_cls(task_id)
80+
task.task_id = task_id
81+
session.add(task)
82+
session.flush()
83+
84+
self._update_result(task, result, state, traceback=traceback, request=request)
85+
session.commit()
86+
87+
def _update_result(self, task, result, state, traceback=None, request=None):
88+
meta = self._get_result_meta(
89+
result=result, state=state, traceback=traceback, request=request, format_date=False, encode=True
90+
)
91+
92+
# Exclude the primary key id and task_id columns
93+
# as we should not set it None
94+
columns = [column.name for column in self.task_cls.__table__.columns if column.name not in {'id', 'task_id'}]
95+
96+
# Iterate through the columns name of the table
97+
# to set the value from meta.
98+
# If the value is not present in meta, set None
99+
for column in columns:
100+
value = meta.get(column)
101+
setattr(task, column, value)
102+
103+
@retry
104+
def _get_task_meta_for(self, task_id):
105+
"""Get task meta-data for a task by id."""
106+
session = self.ResultSession()
107+
with session_cleanup(session):
108+
task = list(session.query(self.task_cls).filter(self.task_cls.task_id == task_id))
109+
task = task and task[0]
110+
if not task:
111+
task = self.task_cls(task_id)
112+
task.status = states.PENDING
113+
task.result = None
114+
data = task.to_dict()
115+
if data.get('args', None) is not None:
116+
data['args'] = self.decode(data['args'])
117+
if data.get('kwargs', None) is not None:
118+
data['kwargs'] = self.decode(data['kwargs'])
119+
return self.meta_from_decoded(data)
120+
121+
@retry
122+
def _save_group(self, group_id, result):
123+
"""Store the result of an executed group."""
124+
session = self.ResultSession()
125+
with session_cleanup(session):
126+
group = self.taskset_cls(group_id, result)
127+
session.add(group)
128+
session.flush()
129+
session.commit()
130+
return result
131+
132+
@retry
133+
def _restore_group(self, group_id):
134+
"""Get meta-data for group by id."""
135+
session = self.ResultSession()
136+
with session_cleanup(session):
137+
group = session.query(self.taskset_cls).filter(self.taskset_cls.taskset_id == group_id).first()
138+
if group:
139+
return group.to_dict()
140+
141+
@retry
142+
def _delete_group(self, group_id):
143+
"""Delete meta-data for group by id."""
144+
session = self.ResultSession()
145+
with session_cleanup(session):
146+
session.query(self.taskset_cls).filter(self.taskset_cls.taskset_id == group_id).delete()
147+
session.flush()
148+
session.commit()
149+
150+
@retry
151+
def _forget(self, task_id):
152+
"""Forget about result."""
153+
session = self.ResultSession()
154+
with session_cleanup(session):
155+
session.query(self.task_cls).filter(self.task_cls.task_id == task_id).delete()
156+
session.commit()
157+
158+
def cleanup(self):
159+
"""Delete expired meta-data."""
160+
session = self.ResultSession()
161+
expires = self.expires
162+
now = self.app.now()
163+
with session_cleanup(session):
164+
session.query(self.task_cls).filter(self.task_cls.date_done < (now - expires)).delete()
165+
session.query(self.taskset_cls).filter(self.taskset_cls.date_done < (now - expires)).delete()
166+
session.commit()
167+
168+
def __reduce__(self, args=(), kwargs=None):
169+
kwargs = {} if not kwargs else kwargs
170+
kwargs.update({'dburi': self.url, 'expires': self.expires, 'engine_options': self.engine_options})
171+
return super().__reduce__(args, kwargs)

backend/app/task/model/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3+
from backend.app.task.model.result import TaskExtended as TaskResult
34
from backend.app.task.model.scheduler import TaskScheduler

backend/app/task/model/result.py

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,109 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3-
from celery.backends.database.models import TaskExtended as TaskResult
3+
from datetime import datetime, timezone
44

5-
OVERWRITE_CELERY_RESULT_TABLE_NAME = 'task_result'
6-
OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME = 'task_group_result'
5+
import sqlalchemy as sa
76

8-
# 重写表名配置
9-
TaskResult.configure(name=OVERWRITE_CELERY_RESULT_TABLE_NAME)
7+
from celery import states
8+
from sqlalchemy.types import PickleType
9+
10+
from backend.common.model import MappedBase
11+
12+
"""
13+
重写 celery.backends.database.models 内部所有模型,适配 fba 创建表和 alembic 迁移
14+
"""
15+
16+
17+
class Task(MappedBase):
18+
"""Task result/status."""
19+
20+
__tablename__ = 'task_result'
21+
__table_args__ = {'comment': '任务结果表'}
22+
23+
id = sa.Column(sa.Integer, sa.Sequence('task_id_sequence'), primary_key=True, autoincrement=True)
24+
task_id = sa.Column(sa.String(155), unique=True)
25+
status = sa.Column(sa.String(50), default=states.PENDING)
26+
result = sa.Column(PickleType, nullable=True)
27+
date_done = sa.Column(
28+
sa.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc), nullable=True
29+
)
30+
traceback = sa.Column(sa.Text, nullable=True)
31+
32+
def __init__(self, task_id):
33+
self.task_id = task_id
34+
35+
def to_dict(self):
36+
return {
37+
'task_id': self.task_id,
38+
'status': self.status,
39+
'result': self.result,
40+
'traceback': self.traceback,
41+
'date_done': self.date_done,
42+
}
43+
44+
def __repr__(self):
45+
return '<Task {0.task_id} state: {0.status}>'.format(self)
46+
47+
@classmethod
48+
def configure(cls, schema=None, name=None):
49+
cls.__table__.schema = schema
50+
cls.id.default.schema = schema
51+
cls.__table__.name = name or cls.__tablename__
52+
53+
54+
class TaskExtended(Task):
55+
"""For the extend result."""
56+
57+
__tablename__ = 'task_result'
58+
__table_args__ = {'extend_existing': True, 'comment': '任务结果表'}
59+
60+
name = sa.Column(sa.String(155), nullable=True)
61+
args = sa.Column(sa.LargeBinary, nullable=True)
62+
kwargs = sa.Column(sa.LargeBinary, nullable=True)
63+
worker = sa.Column(sa.String(155), nullable=True)
64+
retries = sa.Column(sa.Integer, nullable=True)
65+
queue = sa.Column(sa.String(155), nullable=True)
66+
67+
def to_dict(self):
68+
task_dict = super().to_dict()
69+
task_dict.update({
70+
'name': self.name,
71+
'args': self.args,
72+
'kwargs': self.kwargs,
73+
'worker': self.worker,
74+
'retries': self.retries,
75+
'queue': self.queue,
76+
})
77+
return task_dict
78+
79+
80+
class TaskSet(MappedBase):
81+
"""TaskSet result."""
82+
83+
__tablename__ = 'task_set_result'
84+
__table_args__ = {'comment': '任务集结果表'}
85+
86+
id = sa.Column(sa.Integer, sa.Sequence('taskset_id_sequence'), autoincrement=True, primary_key=True)
87+
taskset_id = sa.Column(sa.String(155), unique=True)
88+
result = sa.Column(PickleType, nullable=True)
89+
date_done = sa.Column(sa.DateTime, default=datetime.now(timezone.utc), nullable=True)
90+
91+
def __init__(self, taskset_id, result):
92+
self.taskset_id = taskset_id
93+
self.result = result
94+
95+
def to_dict(self):
96+
return {
97+
'taskset_id': self.taskset_id,
98+
'result': self.result,
99+
'date_done': self.date_done,
100+
}
101+
102+
def __repr__(self):
103+
return f'<TaskSet: {self.taskset_id}>'
104+
105+
@classmethod
106+
def configure(cls, schema=None, name=None):
107+
cls.__table__.schema = schema
108+
cls.id.default.schema = schema
109+
cls.__table__.name = name or cls.__tablename__

backend/app/task/service/result_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from sqlalchemy import Select
44

55
from backend.app.task.crud.crud_result import task_result_dao
6-
from backend.app.task.model.result import TaskResult
6+
from backend.app.task.model import TaskResult
77
from backend.app.task.schema.result import DeleteTaskResultParam
88
from backend.common.exception import errors
99
from backend.database.db import async_db_session

backend/app/task/session.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
from celery.backends.database.session import SessionManager as CelerySessionManager
4+
5+
6+
class SessionManager(CelerySessionManager):
7+
"""
8+
重写 celery SessionManager
9+
"""
10+
11+
def __init__(self):
12+
super().__init__()
13+
14+
# 禁止自动创建 celery 内部定义的任务结果表
15+
self.prepared = True

0 commit comments

Comments
 (0)