Skip to content

Commit 0ff0e3c

Browse files
refactor models
1 parent a53a97e commit 0ff0e3c

File tree

23 files changed

+116
-89
lines changed

23 files changed

+116
-89
lines changed

β€Žpackages/celery-library/src/celery_library/backends/_redis.pyβ€Ž

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@
55

66
from models_library.progress_bar import ProgressReport
77
from pydantic import ValidationError
8-
from servicelib.redis._client import RedisClientSDK
9-
10-
from ..models import (
8+
from servicelib.queued_tasks.models import (
119
Task,
1210
TaskContext,
1311
TaskID,
1412
TaskMetadata,
1513
TaskUUID,
16-
build_task_id_prefix,
1714
)
15+
from servicelib.redis._client import RedisClientSDK
16+
17+
from ..utils import build_task_id_prefix
1818

1919
_CELERY_TASK_INFO_PREFIX: Final[str] = "celery-task-info-"
2020
_CELERY_TASK_ID_KEY_ENCODING = "utf-8"

β€Žpackages/celery-library/src/celery_library/signals.pyβ€Ž

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,12 @@
44

55
from celery import Celery # type: ignore[import-untyped]
66
from celery.worker.worker import WorkController # type: ignore[import-untyped]
7-
from servicelib.celery.app_server import STARTUP_TIMEOUT, BaseAppServer
87
from servicelib.logging_utils import log_context
8+
from servicelib.queued_tasks.app_server import STARTUP_TIMEOUT, BaseAppServer
99
from settings_library.celery import CelerySettings
1010

1111
from .common import create_task_manager
12-
from .utils import (
13-
get_app_server,
14-
set_app_server,
15-
set_task_manager,
16-
)
12+
from .utils import get_app_server, set_app_server
1713

1814
_logger = logging.getLogger(__name__)
1915

@@ -36,22 +32,19 @@ async def _setup_task_manager():
3632
assert sender.app # nosec
3733
assert isinstance(sender.app, Celery) # nosec
3834

39-
set_app_server(sender.app, app_server)
40-
set_task_manager(
35+
app_server.task_manager = await create_task_manager(
4136
sender.app,
42-
await create_task_manager(
43-
sender.app,
44-
celery_settings,
45-
),
37+
celery_settings,
4638
)
4739

48-
async def _setup_app_server():
49-
await app_server.startup(startup_complete_event, shutdown_event)
40+
set_app_server(sender.app, app_server)
5041

5142
app_server.event_loop = loop
5243

5344
loop.run_until_complete(_setup_task_manager())
54-
loop.run_until_complete(_setup_app_server())
45+
loop.run_until_complete(
46+
app_server.startup(startup_complete_event, shutdown_event)
47+
)
5548

5649
thread = threading.Thread(
5750
group=None,

β€Žpackages/celery-library/src/celery_library/task.pyβ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
from celery.exceptions import Ignore # type: ignore[import-untyped]
1515
from pydantic import NonNegativeInt
1616
from servicelib.async_utils import cancel_wait_task
17+
from servicelib.queued_tasks.models import TaskID
1718

1819
from .errors import encode_celery_transferrable_error
19-
from .models import TaskID
2020
from .utils import get_app_server
2121

2222
_logger = logging.getLogger(__name__)

β€Žpackages/celery-library/src/celery_library/task_manager.pyβ€Ž

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
from common_library.async_tools import make_async
1111
from models_library.progress_bar import ProgressReport
1212
from servicelib.logging_utils import log_context
13-
from settings_library.celery import CelerySettings
14-
15-
from .models import (
13+
from servicelib.queued_tasks.models import (
1614
Task,
1715
TaskContext,
1816
TaskID,
@@ -21,8 +19,10 @@
2119
TaskState,
2220
TaskStatus,
2321
TaskUUID,
24-
build_task_id,
2522
)
23+
from settings_library.celery import CelerySettings
24+
25+
from .utils import build_task_id
2626

2727
_logger = logging.getLogger(__name__)
2828

β€Žpackages/celery-library/src/celery_library/utils.pyβ€Ž

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,24 @@
1-
from celery import Celery # type: ignore[import-untyped]
2-
from servicelib.celery.app_server import BaseAppServer
1+
from typing import Final
32

4-
from .task_manager import CeleryTaskManager
3+
from celery import Celery # type: ignore[import-untyped]
4+
from servicelib.queued_tasks.app_server import BaseAppServer
5+
from servicelib.queued_tasks.models import TaskContext, TaskID, TaskUUID
56

67
_APP_SERVER_KEY = "app_server"
7-
_TASK_MANAGER_KEY = "task_manager"
8+
9+
_TASK_ID_KEY_DELIMITATOR: Final[str] = ":"
10+
11+
12+
def build_task_id_prefix(task_context: TaskContext) -> str:
13+
return _TASK_ID_KEY_DELIMITATOR.join(
14+
[f"{task_context[key]}" for key in sorted(task_context)]
15+
)
16+
17+
18+
def build_task_id(task_context: TaskContext, task_uuid: TaskUUID) -> TaskID:
19+
return _TASK_ID_KEY_DELIMITATOR.join(
20+
[build_task_id_prefix(task_context), f"{task_uuid}"]
21+
)
822

923

1024
def get_app_server(app: Celery) -> BaseAppServer:
@@ -15,13 +29,3 @@ def get_app_server(app: Celery) -> BaseAppServer:
1529

1630
def set_app_server(app: Celery, app_server: BaseAppServer) -> None:
1731
app.conf[_APP_SERVER_KEY] = app_server
18-
19-
20-
def get_task_manager(celery_app: Celery) -> CeleryTaskManager:
21-
worker = celery_app.conf[_TASK_MANAGER_KEY]
22-
assert isinstance(worker, CeleryTaskManager)
23-
return worker
24-
25-
26-
def set_task_manager(celery_app: Celery, worker: CeleryTaskManager) -> None:
27-
celery_app.conf[_TASK_MANAGER_KEY] = worker

β€Žpackages/celery-library/tests/conftest.pyβ€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
from celery.worker.worker import WorkController
1414
from celery_library.common import create_task_manager
1515
from celery_library.signals import on_worker_init, on_worker_shutdown
16-
from celery_library.utils import CeleryTaskManager
16+
from celery_library.task_manager import CeleryTaskManager
1717
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
1818
from pytest_simcore.helpers.typing_env import EnvVarsDict
19-
from servicelib.celery.app_server import BaseAppServer
19+
from servicelib.queued_tasks.app_server import BaseAppServer
2020
from settings_library.celery import CelerySettings
2121
from settings_library.redis import RedisSettings
2222

β€Žpackages/celery-library/tests/unit/test_tasks.pyβ€Ž

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@
1414
from celery import Celery, Task
1515
from celery.contrib.abortable import AbortableTask
1616
from celery_library.errors import TransferrableCeleryError
17-
from celery_library.models import (
18-
TaskContext,
19-
TaskID,
20-
TaskMetadata,
21-
TaskState,
22-
)
2317
from celery_library.task import (
2418
AbortableAsyncResult,
2519
register_task,
2620
)
2721
from celery_library.task_manager import CeleryTaskManager
28-
from celery_library.utils import get_app_server, get_task_manager
22+
from celery_library.utils import get_app_server
2923
from common_library.errors_classes import OsparcErrorMixin
3024
from models_library.progress_bar import ProgressReport
3125
from servicelib.logging_utils import log_context
26+
from servicelib.queued_tasks.models import (
27+
TaskContext,
28+
TaskID,
29+
TaskMetadata,
30+
TaskState,
31+
)
3232
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
3333

3434
_logger = logging.getLogger(__name__)
@@ -40,14 +40,12 @@
4040
async def _fake_file_processor(
4141
celery_app: Celery, task_name: str, task_id: str, files: list[str]
4242
) -> str:
43-
worker = get_task_manager(celery_app)
44-
4543
def sleep_for(seconds: float) -> None:
4644
time.sleep(seconds)
4745

4846
for n, file in enumerate(files, start=1):
4947
with log_context(_logger, logging.INFO, msg=f"Processing file {file}"):
50-
await worker.set_task_progress(
48+
await get_app_server(celery_app).task_manager.set_task_progress(
5149
task_id=task_id,
5250
report=ProgressReport(actual_value=n / len(files)),
5351
)

β€Žpackages/service-library/src/servicelib/fastapi/celery/app_server.pyβ€Ž renamed to β€Žpackages/service-library/src/servicelib/fastapi/queued_tasks/app_server.pyβ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from asgi_lifespan import LifespanManager
55
from fastapi import FastAPI
66

7-
from ...celery.app_server import BaseAppServer
7+
from ...queued_tasks.app_server import BaseAppServer
88

99
_SHUTDOWN_TIMEOUT: Final[float] = timedelta(seconds=10).total_seconds()
1010
_STARTUP_TIMEOUT: Final[float] = timedelta(minutes=1).total_seconds()

0 commit comments

Comments
Β (0)