diff --git a/backend/app/task/celery.py b/backend/app/task/celery.py index 24076a118..e86e29912 100644 --- a/backend/app/task/celery.py +++ b/backend/app/task/celery.py @@ -15,7 +15,7 @@ def find_task_packages(): packages = [] for root, dirs, files in os.walk(os.path.join(BASE_PATH, 'app', 'task', 'tasks')): if 'tasks.py' in files: - package = root.replace(str(BASE_PATH) + os.path.sep, '').replace(os.path.sep, '.') + package = root.replace(str(BASE_PATH.parent) + os.path.sep, '').replace(os.path.sep, '.') packages.append(package) return packages @@ -45,8 +45,8 @@ def init_celery() -> celery.Celery: result_extended=True, # result_expires=0, # 任务结果自动清理,0 或 None 表示不清理 beat_schedule=LOCAL_BEAT_SCHEDULE, - beat_scheduler='app.task.utils.schedulers:DatabaseScheduler', - task_cls='app.task.tasks.base:TaskBase', + beat_scheduler='backend.app.task.utils.schedulers:DatabaseScheduler', + task_cls='backend.app.task.tasks.base:TaskBase', task_track_started=True, enable_utc=False, timezone=settings.DATETIME_TIMEZONE, diff --git a/backend/celery-start.sh b/backend/celery-start.sh index 9524e1d73..965c2f4bc 100644 --- a/backend/celery-start.sh +++ b/backend/celery-start.sh @@ -1,10 +1,10 @@ #!/usr/bin/env bash # work && beat -celery -A app.task.celery worker -l info -P gevent -c 100 & +celery -A backend.app.task.celery worker -l info -P gevent -c 100 & # beat -celery -A app.task.celery beat -l info & +celery -A backend.app.task.celery beat -l info & # flower -celery -A app.task.celery flower --port=8555 --basic-auth=admin:123456 +celery -A backend.app.task.celery flower --port=8555 --basic-auth=admin:123456 diff --git a/backend/cli.py b/backend/cli.py index a7c759cbe..3baa9ca40 100644 --- a/backend/cli.py +++ b/backend/cli.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio +import subprocess from dataclasses import dataclass -from typing import Annotated +from typing import Annotated, Literal import cappa import granian @@ -49,6 +50,34 @@ def run(host: str, port: int, reload: bool, workers: int | None) -> None: ).serve() +def run_celery_worker(log_level: Literal['info', 'debug']) -> None: + try: + subprocess.run(['celery', '-A', 'backend.app.task.celery', 'worker', '-l', f'{log_level}', '-P', 'gevent']) + except KeyboardInterrupt: + pass + + +def run_celery_beat(log_level: Literal['info', 'debug']) -> None: + try: + subprocess.run(['celery', '-A', 'backend.app.task.celery', 'beat', '-l', f'{log_level}']) + except KeyboardInterrupt: + pass + + +def run_celery_flower(port: int, basic_auth: str) -> None: + try: + subprocess.run([ + 'celery', + '-A', + 'backend.app.task.celery', + 'flower', + f'--port={port}', + f'--basic-auth={basic_auth}', + ]) + except KeyboardInterrupt: + pass + + async def install_plugin( path: str, repo_url: str, no_sql: bool, db_type: DataBaseType, pk_type: PrimaryKeyType ) -> None: @@ -89,7 +118,7 @@ async def execute_sql_scripts(sql_scripts: str) -> None: console.print(Text('SQL 脚本已执行完成', style='bold green')) -@cappa.command(help='运行服务') +@cappa.command(help='运行 API 服务') @dataclass class Run: host: Annotated[ @@ -118,6 +147,49 @@ def __call__(self): run(host=self.host, port=self.port, reload=self.no_reload, workers=self.workers) +@cappa.command(help='从当前主机启动 Celery worker 服务') +@dataclass +class Worker: + log_level: Annotated[ + Literal['info', 'debug'], + cappa.Arg(long=True, short='-l', default='info', help='日志输出级别'), + ] + + def __call__(self): + run_celery_worker(log_level=self.log_level) + + +@cappa.command(help='从当前主机启动 Celery beat 服务') +@dataclass +class Beat: + log_level: Annotated[ + Literal['info', 'debug'], + cappa.Arg(long=True, short='-l', default='info', help='日志输出级别'), + ] + + def __call__(self): + run_celery_beat(log_level=self.log_level) + + +@cappa.command(help='从当前主机启动 Celery flower 服务') +@dataclass +class Flower: + port: Annotated[int, cappa.Arg(long=True, default=8555, help='提供服务的主机端口号')] + basic_auth: Annotated[str, cappa.Arg(long=True, default='admin:123456', help='页面登录的用户名和密码')] + + def __call__(self): + run_celery_flower(port=self.port, basic_auth=self.basic_auth) + + +@cappa.command(help='运行 Celery 服务') +@dataclass +class Celery: + subcmd: cappa.Subcommands[Worker | Beat | Flower | None] = None + + def __call__(self): + console.print('\n更多信息,尝试 "[cyan]--help[/]"') + + @cappa.command(help='新增插件') @dataclass class Add: @@ -151,13 +223,13 @@ async def __call__(self): class FbaCli: version: Annotated[ bool, - cappa.Arg(short='-V', long=True, default=False, help='打印当前版本号'), + cappa.Arg(short='-V', long=True, default=False, show_default=False, help='打印当前版本号'), ] sql: Annotated[ str, - cappa.Arg(long=True, default='', help='在事务中执行 SQL 脚本'), + cappa.Arg(value_name='PATH', long=True, default='', show_default=False, help='在事务中执行 SQL 脚本'), ] - subcmd: cappa.Subcommands[Run | Add | None] = None + subcmd: cappa.Subcommands[Run | Celery | Add | None] = None async def __call__(self): if self.version: