Skip to content

Commit f7a6279

Browse files
unify interfaces
1 parent e813cdb commit f7a6279

File tree

13 files changed

+51
-64
lines changed

13 files changed

+51
-64
lines changed

packages/celery-library/src/celery_library/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from settings_library.redis import RedisDatabase
88

99
from .backends._redis import RedisTaskInfoStore
10-
from .client import CeleryTaskClient
1110
from .common import create_app
11+
from .task_manager import CeleryTaskManager
1212
from .types import register_celery_types
1313

1414
_logger = logging.getLogger(__name__)
@@ -24,7 +24,7 @@ async def on_startup() -> None:
2424
client_name=f"{app.title}.celery_tasks",
2525
)
2626

27-
app.state.celery_client = CeleryTaskClient(
27+
app.state.celery_client = CeleryTaskManager(
2828
celery_app,
2929
celery_settings,
3030
RedisTaskInfoStore(redis_client_sdk),
@@ -35,10 +35,10 @@ async def on_startup() -> None:
3535
app.add_event_handler("startup", on_startup)
3636

3737

38-
def get_celery_client(app: FastAPI) -> CeleryTaskClient:
38+
def get_celery_client(app: FastAPI) -> CeleryTaskManager:
3939
assert hasattr(app.state, "celery_client") # nosec
4040
celery_client = app.state.celery_client
41-
assert isinstance(celery_client, CeleryTaskClient)
41+
assert isinstance(celery_client, CeleryTaskManager)
4242
return celery_client
4343

4444

packages/celery-library/src/celery_library/signals.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
from . import set_event_loop
1818
from .backends._redis import RedisTaskInfoStore
19+
from .task_manager import CeleryTaskManager
1920
from .utils import (
2021
get_fastapi_app,
2122
set_celery_worker,
2223
set_fastapi_app,
2324
)
24-
from .worker import CeleryTaskWorker
2525

2626
_logger = logging.getLogger(__name__)
2727

@@ -57,7 +57,9 @@ async def setup_task_worker():
5757
assert isinstance(sender.app, Celery) # nosec
5858
set_celery_worker(
5959
sender.app,
60-
CeleryTaskWorker(
60+
CeleryTaskManager(
61+
sender.app,
62+
celery_settings,
6163
RedisTaskInfoStore(redis_client_sdk),
6264
),
6365
)

packages/celery-library/src/celery_library/client.py renamed to packages/celery-library/src/celery_library/task_manager.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
_MAX_PROGRESS_VALUE = 1.0
3232

3333

34-
@dataclass
35-
class CeleryTaskClient:
34+
@dataclass(frozen=True)
35+
class CeleryTaskManager:
3636
_celery_app: Celery
3737
_celery_settings: CelerySettings
3838
_task_info_store: TaskInfoStore
@@ -155,3 +155,9 @@ async def list_tasks(self, task_context: TaskContext) -> list[Task]:
155155
msg=f"Listing tasks: {task_context=}",
156156
):
157157
return await self._task_info_store.list_tasks(task_context)
158+
159+
async def set_task_progress(self, task_id: TaskID, report: ProgressReport) -> None:
160+
await self._task_info_store.set_task_progress(
161+
task_id=task_id,
162+
report=report,
163+
)

packages/celery-library/src/celery_library/utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
from celery import Celery # type: ignore[import-untyped]
22
from fastapi import FastAPI
33

4-
from .worker import CeleryTaskWorker
4+
from .task_manager import CeleryTaskManager
55

66
_WORKER_KEY = "celery_worker"
77
_FASTAPI_APP_KEY = "fastapi_app"
88

99

10-
def set_celery_worker(celery_app: Celery, worker: CeleryTaskWorker) -> None:
10+
def set_celery_worker(celery_app: Celery, worker: CeleryTaskManager) -> None:
1111
celery_app.conf[_WORKER_KEY] = worker
1212

1313

14-
def get_celery_worker(celery_app: Celery) -> CeleryTaskWorker:
14+
def get_celery_worker(celery_app: Celery) -> CeleryTaskManager:
1515
worker = celery_app.conf[_WORKER_KEY]
16-
assert isinstance(worker, CeleryTaskWorker)
16+
assert isinstance(worker, CeleryTaskManager)
1717
return worker
1818

1919

packages/celery-library/src/celery_library/worker.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

services/storage/src/simcore_service_storage/api/rest/_files.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
from typing import Annotated, Final, cast
33
from urllib.parse import quote
44

5-
from celery_library.client import CeleryTaskClient
65
from celery_library.models import TaskMetadata, TaskUUID
6+
from celery_library.task_manager import CeleryTaskManager
77
from fastapi import APIRouter, Depends, Header, Request
88
from models_library.api_schemas_rpc_async_jobs.async_jobs import AsyncJobNameData
99
from models_library.api_schemas_storage.storage_schemas import (
@@ -270,7 +270,7 @@ async def abort_upload_file(
270270
status_code=status.HTTP_202_ACCEPTED,
271271
)
272272
async def complete_upload_file(
273-
celery_client: Annotated[CeleryTaskClient, Depends(get_celery_client)],
273+
celery_client: Annotated[CeleryTaskManager, Depends(get_celery_client)],
274274
query_params: Annotated[StorageQueryParamsBase, Depends()],
275275
location_id: LocationID,
276276
file_id: StorageFileID,
@@ -326,7 +326,7 @@ async def complete_upload_file(
326326
response_model=Envelope[FileUploadCompleteFutureResponse],
327327
)
328328
async def is_completed_upload_file(
329-
celery_client: Annotated[CeleryTaskClient, Depends(get_celery_client)],
329+
celery_client: Annotated[CeleryTaskManager, Depends(get_celery_client)],
330330
query_params: Annotated[StorageQueryParamsBase, Depends()],
331331
location_id: LocationID,
332332
file_id: StorageFileID,
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from typing import Annotated
22

33
from celery_library import get_celery_client as _get_celery_client_from_app
4-
from celery_library.client import CeleryTaskClient
4+
from celery_library.task_manager import CeleryTaskManager
55
from fastapi import Depends, FastAPI
66
from servicelib.fastapi.dependencies import get_app
77

88

99
def get_celery_client(
1010
app: Annotated[FastAPI, Depends(get_app)],
11-
) -> CeleryTaskClient:
11+
) -> CeleryTaskManager:
1212
return _get_celery_client_from_app(app)

services/storage/tests/conftest.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
from celery.signals import worker_init, worker_shutdown
2828
from celery.worker.worker import WorkController
2929
from celery_library.signals import on_worker_init, on_worker_shutdown
30+
from celery_library.task_manager import CeleryTaskManager
3031
from celery_library.utils import get_celery_worker
31-
from celery_library.worker import CeleryTaskWorker
3232
from faker import Faker
3333
from fakeredis.aioredis import FakeRedis
3434
from fastapi import FastAPI
@@ -364,7 +364,7 @@ def upload_file(
364364
create_upload_file_link_v2: Callable[..., Awaitable[FileUploadSchema]],
365365
create_file_of_size: Callable[[ByteSize, str | None], Path],
366366
create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID],
367-
with_storage_celery_worker: CeleryTaskWorker,
367+
with_storage_celery_worker: CeleryTaskManager,
368368
) -> Callable[
369369
[ByteSize, str, SimcoreS3FileID | None], Awaitable[tuple[Path, SimcoreS3FileID]]
370370
]:
@@ -479,7 +479,7 @@ async def create_empty_directory(
479479
create_simcore_file_id: Callable[[ProjectID, NodeID, str], SimcoreS3FileID],
480480
create_upload_file_link_v2: Callable[..., Awaitable[FileUploadSchema]],
481481
client: httpx.AsyncClient,
482-
with_storage_celery_worker: CeleryTaskWorker,
482+
with_storage_celery_worker: CeleryTaskManager,
483483
) -> Callable[[str, ProjectID, NodeID], Awaitable[SimcoreS3FileID]]:
484484
async def _directory_creator(
485485
dir_name: str, project_id: ProjectID, node_id: NodeID
@@ -1029,7 +1029,7 @@ def _on_worker_init_wrapper(sender: WorkController, **_kwargs) -> None:
10291029
@pytest.fixture
10301030
def with_storage_celery_worker(
10311031
with_storage_celery_worker_controller: TestWorkController,
1032-
) -> CeleryTaskWorker:
1032+
) -> CeleryTaskManager:
10331033
assert isinstance(with_storage_celery_worker_controller.app, Celery)
10341034
return get_celery_worker(with_storage_celery_worker_controller.app)
10351035

services/storage/tests/unit/test_async_jobs.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@
1111
import pytest
1212
from celery import Celery, Task
1313
from celery_library import get_celery_client
14-
from celery_library.client import TaskMetadata
15-
from celery_library.models import TaskID
14+
from celery_library.models import TaskID, TaskMetadata
1615
from celery_library.task import register_task
17-
from celery_library.worker import CeleryTaskWorker
16+
from celery_library.task_manager import CeleryTaskManager
1817
from fastapi import FastAPI
1918
from models_library.api_schemas_rpc_async_jobs.async_jobs import (
2019
AsyncJobGet,
@@ -203,7 +202,7 @@ async def test_async_jobs_workflow(
203202
initialized_app: FastAPI,
204203
register_rpc_routes: None,
205204
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
206-
with_storage_celery_worker: CeleryTaskWorker,
205+
with_storage_celery_worker: CeleryTaskManager,
207206
user_id: UserID,
208207
product_name: ProductName,
209208
exposed_rpc_start: str,
@@ -251,7 +250,7 @@ async def test_async_jobs_cancel(
251250
initialized_app: FastAPI,
252251
register_rpc_routes: None,
253252
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
254-
with_storage_celery_worker: CeleryTaskWorker,
253+
with_storage_celery_worker: CeleryTaskManager,
255254
user_id: UserID,
256255
product_name: ProductName,
257256
exposed_rpc_start: str,
@@ -316,7 +315,7 @@ async def test_async_jobs_raises(
316315
initialized_app: FastAPI,
317316
register_rpc_routes: None,
318317
storage_rabbitmq_rpc_client: RabbitMQRPCClient,
319-
with_storage_celery_worker: CeleryTaskWorker,
318+
with_storage_celery_worker: CeleryTaskManager,
320319
user_id: UserID,
321320
product_name: ProductName,
322321
exposed_rpc_start: str,

services/storage/tests/unit/test_handlers_files.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from aiohttp import ClientSession
2424
from aws_library.s3 import S3KeyNotFoundError, S3ObjectKey, SimcoreS3API
2525
from aws_library.s3._constants import MULTIPART_UPLOADS_MIN_TOTAL_SIZE
26-
from celery_library.worker import CeleryTaskWorker
26+
from celery_library.task_manager import CeleryTaskManager
2727
from faker import Faker
2828
from fastapi import FastAPI
2929
from models_library.api_schemas_storage.storage_schemas import (
@@ -683,7 +683,7 @@ async def test_upload_real_file_with_s3_client(
683683
node_id: NodeID,
684684
faker: Faker,
685685
s3_client: S3Client,
686-
with_storage_celery_worker: CeleryTaskWorker,
686+
with_storage_celery_worker: CeleryTaskManager,
687687
):
688688
file_size = TypeAdapter(ByteSize).validate_python("500Mib")
689689
file_name = faker.file_name()

0 commit comments

Comments
 (0)