|
1 | | -from celery_library.common import create_app, create_task_manager |
| 1 | +from celery_library.backends._redis import RedisTaskInfoStore |
| 2 | +from celery_library.common import create_app |
2 | 3 | from celery_library.task_manager import CeleryTaskManager |
3 | 4 | from celery_library.types import register_celery_types, register_pydantic_types |
4 | 5 | from fastapi import FastAPI |
5 | 6 | from models_library.api_schemas_storage.storage_schemas import ( |
6 | 7 | FileUploadCompletionBody, |
7 | 8 | FoldersBody, |
8 | 9 | ) |
| 10 | +from servicelib.redis import RedisClientSDK |
9 | 11 | from settings_library.celery import CelerySettings |
| 12 | +from settings_library.redis import RedisDatabase |
10 | 13 |
|
11 | 14 | from ...models import FileMetaData |
12 | 15 |
|
13 | 16 |
|
14 | 17 | def setup_task_manager(app: FastAPI, celery_settings: CelerySettings) -> None: |
15 | 18 | async def on_startup() -> None: |
16 | | - app.state.task_manager = await create_task_manager( |
17 | | - create_app(celery_settings), celery_settings |
| 19 | + redis_client_sdk = RedisClientSDK( |
| 20 | + celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn( |
| 21 | + RedisDatabase.CELERY_TASKS |
| 22 | + ), |
| 23 | + client_name="celery_tasks", |
| 24 | + ) |
| 25 | + app.state.celery_tasks_redis_client_sdk = redis_client_sdk |
| 26 | + await redis_client_sdk.setup() |
| 27 | + |
| 28 | + app.state.task_manager = CeleryTaskManager( |
| 29 | + create_app(celery_settings), |
| 30 | + celery_settings, |
| 31 | + RedisTaskInfoStore(redis_client_sdk), |
18 | 32 | ) |
19 | 33 |
|
20 | 34 | register_celery_types() |
21 | 35 | register_pydantic_types(FileUploadCompletionBody, FileMetaData, FoldersBody) |
22 | 36 |
|
| 37 | + async def on_shutdown() -> None: |
| 38 | + redis_client_sdk: RedisClientSDK = app.state.celery_tasks_redis_client_sdk |
| 39 | + await redis_client_sdk.shutdown() |
| 40 | + |
23 | 41 | app.add_event_handler("startup", on_startup) |
| 42 | + app.add_event_handler("shutdown", on_shutdown) |
24 | 43 |
|
25 | 44 |
|
26 | 45 | def get_task_manager_from_app(app: FastAPI) -> CeleryTaskManager: |
|
0 commit comments