Skip to content

Commit 305f925

Browse files
continue
1 parent a1de0be commit 305f925

File tree

6 files changed

+21
-36
lines changed

6 files changed

+21
-36
lines changed

packages/celery-library/src/celery_library/signals.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def _init(startup_complete_event: threading.Event) -> None:
4040
shutdown_event = asyncio.Event()
4141

4242
fastapi_app = app_factory()
43+
assert isinstance(fastapi_app, FastAPI) # nosec
4344

4445
async def setup_task_worker():
4546
redis_client_sdk = RedisClientSDK(

packages/celery-library/src/celery_library/task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,5 @@ def register_task( # type: ignore[misc]
205205
bind=True,
206206
base=AbortableTask,
207207
time_limit=None if timeout is None else timeout.total_seconds(),
208+
pydantic=True,
208209
)(wrapped_fn)
Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
from functools import partial
21
from pathlib import Path
3-
from typing import Any
42

53
from kombu.utils.json import register_type # type: ignore[import-untyped]
6-
from pydantic import BaseModel
74

85

96
def _path_encoder(obj):
@@ -23,14 +20,6 @@ def _class_full_name(clz: type) -> str:
2320
return ".".join([clz.__module__, clz.__qualname__])
2421

2522

26-
def _pydantic_model_encoder(obj: BaseModel, *args, **kwargs) -> dict[str, Any]:
27-
return obj.model_dump(*args, **kwargs, mode="json")
28-
29-
30-
def _pydantic_model_decoder(clz: type[BaseModel], data: dict[str, Any]) -> BaseModel:
31-
return clz(**data)
32-
33-
3423
def register_celery_types() -> None:
3524
register_type(
3625
Path,
@@ -39,13 +28,3 @@ def register_celery_types() -> None:
3928
_path_decoder,
4029
)
4130
register_type(set, _class_full_name(set), encoder=list, decoder=set)
42-
43-
44-
def register_pydantic_types(*models: type[BaseModel]) -> None:
45-
for model in models:
46-
register_type(
47-
model,
48-
_class_full_name(model),
49-
encoder=_pydantic_model_encoder,
50-
decoder=partial(_pydantic_model_decoder, model),
51-
)

services/storage/src/simcore_service_storage/api/_worker_tasks/tasks.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,10 @@
22

33
from celery import Celery # type: ignore[import-untyped]
44
from celery_library.task import register_task
5-
from celery_library.types import register_celery_types, register_pydantic_types
5+
from celery_library.types import register_celery_types
66
from models_library.api_schemas_storage.export_data_async_jobs import AccessRightError
7-
from models_library.api_schemas_storage.storage_schemas import (
8-
FileUploadCompletionBody,
9-
FoldersBody,
10-
)
117
from servicelib.logging_utils import log_context
128

13-
from ...models import FileMetaData
149
from ._files import complete_upload_file
1510
from ._paths import compute_path_size, delete_paths
1611
from ._simcore_s3 import deep_copy_files_from_project, export_data
@@ -20,9 +15,6 @@
2015

2116
def setup_worker_tasks(app: Celery) -> None:
2217
register_celery_types()
23-
register_pydantic_types(FileMetaData)
24-
register_pydantic_types(FileUploadCompletionBody)
25-
register_pydantic_types(FoldersBody)
2618

2719
with log_context(_logger, logging.INFO, msg="worker task registration"):
2820
register_task(app, export_data, dont_autoretry_for=(AccessRightError,))

services/storage/src/simcore_service_storage/modules/celery/worker_main.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,18 @@
2626
)
2727

2828

29-
assert _settings.STORAGE_CELERY
29+
assert _settings.STORAGE_CELERY # nosec
3030
app = create_celery_app(_settings.STORAGE_CELERY)
31-
app_factory = partial(create_app(_settings))
32-
worker_init.connect(partial(on_worker_init, app_factory, _settings.STORAGE_CELERY))
31+
app_factory = partial(create_app, _settings)
32+
33+
34+
def worker_init_wrapper(sender, **_kwargs):
35+
return partial(on_worker_init, app_factory, _settings.STORAGE_CELERY)(
36+
sender, **_kwargs
37+
)
38+
39+
40+
worker_init.connect(worker_init_wrapper)
3341
worker_shutdown.connect(on_worker_shutdown)
3442

3543

services/storage/tests/conftest.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,9 +1001,13 @@ async def with_storage_celery_worker_controller(
10011001
monkeypatch.setenv("STORAGE_WORKER_MODE", "true")
10021002
app_settings = ApplicationSettings.create_from_envs()
10031003
app_factory = partial(create_app, app_settings)
1004-
worker_init.connect(
1005-
partial(on_worker_init, app_factory, app_settings.STORAGE_CELERY)
1006-
)
1004+
1005+
def _on_worker_init_wrapper(sender: Celery, **_kwargs) -> None:
1006+
return partial(on_worker_init, app_factory, app_settings.STORAGE_CELERY)(
1007+
sender, **_kwargs
1008+
)
1009+
1010+
worker_init.connect(_on_worker_init_wrapper)
10071011
worker_shutdown.connect(on_worker_shutdown)
10081012

10091013
setup_worker_tasks(celery_app)

0 commit comments

Comments
 (0)