Skip to content

Commit 3d0bbb5

Browse files
move method
1 parent c01d337 commit 3d0bbb5

File tree

3 files changed

+30
-38
lines changed

3 files changed

+30
-38
lines changed

packages/celery-library/src/celery_library/__init__.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@
22
from asyncio import AbstractEventLoop
33

44
from fastapi import FastAPI
5-
from servicelib.redis._client import RedisClientSDK
65
from settings_library.celery import CelerySettings
7-
from settings_library.redis import RedisDatabase
86

9-
from .backends._redis import RedisTaskInfoStore
10-
from .common import create_app
7+
from .common import create_app, create_task_manager
118
from .task_manager import CeleryTaskManager
129
from .types import register_celery_types
1310

@@ -16,18 +13,8 @@
1613

1714
def setup_celery_client(app: FastAPI, celery_settings: CelerySettings) -> None:
1815
async def on_startup() -> None:
19-
celery_app = create_app(celery_settings)
20-
redis_client_sdk = RedisClientSDK(
21-
celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
22-
RedisDatabase.CELERY_TASKS
23-
),
24-
client_name=f"{app.title}.celery_tasks",
25-
)
26-
27-
app.state.celery_client = CeleryTaskManager(
28-
celery_app,
29-
celery_settings,
30-
RedisTaskInfoStore(redis_client_sdk),
16+
app.state.celery_client = create_task_manager(
17+
create_app(celery_settings), celery_settings
3118
)
3219

3320
register_celery_types()

packages/celery-library/src/celery_library/common.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import logging
21
import ssl
32
from typing import Any
43

54
from celery import Celery # type: ignore[import-untyped]
5+
from servicelib.redis import RedisClientSDK
66
from settings_library.celery import CelerySettings
77
from settings_library.redis import RedisDatabase
88

9-
_logger = logging.getLogger(__name__)
9+
from .backends._redis import RedisTaskInfoStore
10+
from .task_manager import CeleryTaskManager
1011

1112

1213
def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
@@ -25,13 +26,28 @@ def _celery_configure(celery_settings: CelerySettings) -> dict[str, Any]:
2526
return base_config
2627

2728

28-
def create_app(celery_settings: CelerySettings) -> Celery:
29-
assert celery_settings
29+
def create_app(settings: CelerySettings) -> Celery:
30+
assert settings
3031

3132
return Celery(
32-
broker=celery_settings.CELERY_RABBIT_BROKER.dsn,
33-
backend=celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
33+
broker=settings.CELERY_RABBIT_BROKER.dsn,
34+
backend=settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
3435
RedisDatabase.CELERY_TASKS,
3536
),
36-
**_celery_configure(celery_settings),
37+
**_celery_configure(settings),
38+
)
39+
40+
41+
def create_task_manager(app: Celery, settings: CelerySettings) -> CeleryTaskManager:
42+
redis_client_sdk = RedisClientSDK(
43+
settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
44+
RedisDatabase.CELERY_TASKS
45+
),
46+
client_name="celery_tasks",
47+
)
48+
49+
return CeleryTaskManager(
50+
app,
51+
settings,
52+
RedisTaskInfoStore(redis_client_sdk),
3753
)

packages/celery-library/src/celery_library/signals.py

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@
1010
from celery.worker.worker import WorkController # type: ignore[import-untyped]
1111
from fastapi import FastAPI
1212
from servicelib.logging_utils import log_context
13-
from servicelib.redis._client import RedisClientSDK
1413
from settings_library.celery import CelerySettings
15-
from settings_library.redis import RedisDatabase
1614

1715
from . import set_event_loop
18-
from .backends._redis import RedisTaskInfoStore
19-
from .task_manager import CeleryTaskManager
16+
from .common import create_task_manager
2017
from .utils import (
2118
get_fastapi_app,
2219
set_fastapi_app,
@@ -45,22 +42,14 @@ def _init(startup_complete_event: threading.Event) -> None:
4542
fastapi_app = app_factory()
4643
assert isinstance(fastapi_app, FastAPI) # nosec
4744

48-
async def setup_task_worker():
49-
redis_client_sdk = RedisClientSDK(
50-
celery_settings.CELERY_REDIS_RESULT_BACKEND.build_redis_dsn(
51-
RedisDatabase.CELERY_TASKS
52-
),
53-
client_name=f"{fastapi_app.title}.celery_tasks",
54-
)
55-
45+
async def setup_task_manager():
5646
assert sender.app # nosec
5747
assert isinstance(sender.app, Celery) # nosec
5848
set_task_manager(
5949
sender.app,
60-
CeleryTaskManager(
50+
create_task_manager(
6151
sender.app,
6252
celery_settings,
63-
RedisTaskInfoStore(redis_client_sdk),
6453
),
6554
)
6655

@@ -83,7 +72,7 @@ async def fastapi_lifespan(
8372
set_event_loop(fastapi_app, loop)
8473

8574
set_fastapi_app(sender.app, fastapi_app)
86-
loop.run_until_complete(setup_task_worker())
75+
loop.run_until_complete(setup_task_manager())
8776
loop.run_until_complete(
8877
fastapi_lifespan(startup_complete_event, shutdown_event)
8978
)

0 commit comments

Comments
 (0)