Skip to content

Commit ac11201

Browse files
author
Ilyas Gasanov
committed
[DOP-21799] Refactor celery initialization
1 parent 584c993 commit ac11201

File tree

11 files changed

+25
-15
lines changed

11 files changed

+25
-15
lines changed

docker/entrypoint_worker.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,3 @@ set -e
66

77
# exec is required to forward all signals to the main process
88
exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@"
9-

syncmaster/backend/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from celery import Celery
34
from fastapi import FastAPI, HTTPException
45
from fastapi.exceptions import RequestValidationError
56
from pydantic import ValidationError
@@ -18,6 +19,7 @@
1819
from syncmaster.backend.settings import ServerAppSettings as Settings
1920
from syncmaster.db.factory import create_session_factory, get_uow
2021
from syncmaster.exceptions import SyncmasterError
22+
from syncmaster.worker import celery_factory
2123

2224

2325
def application_factory(settings: Settings) -> FastAPI:
@@ -30,6 +32,7 @@ def application_factory(settings: Settings) -> FastAPI:
3032
redoc_url=None,
3133
)
3234
application.state.settings = settings
35+
application.state.celery = celery_factory(settings)
3336
application.include_router(api_router)
3437
application.exception_handler(RequestValidationError)(validation_exception_handler)
3538
application.exception_handler(ValidationError)(validation_exception_handler)
@@ -44,6 +47,7 @@ def application_factory(settings: Settings) -> FastAPI:
4447
{
4548
Settings: lambda: settings,
4649
UnitOfWork: get_uow(session_factory, settings=settings),
50+
Celery: lambda: application.state.celery,
4751
},
4852
)
4953

syncmaster/backend/api/v1/runs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import Annotated
66

77
from asgi_correlation_id import correlation_id
8+
from celery import Celery
89
from fastapi import APIRouter, Depends, Query
910
from jinja2 import Template
1011
from kombu.exceptions import KombuError
@@ -24,7 +25,6 @@
2425
ReadRunSchema,
2526
RunPageSchema,
2627
)
27-
from syncmaster.worker import celery
2828

2929
router = APIRouter(tags=["Runs"], responses=get_error_responses())
3030

@@ -84,6 +84,7 @@ async def read_run(
8484
async def start_run(
8585
create_run_data: CreateRunSchema,
8686
settings: Annotated[Settings, Depends(Stub(Settings))],
87+
celery: Annotated[Celery, Depends(Stub(Celery))],
8788
unit_of_work: UnitOfWork = Depends(UnitOfWork),
8889
current_user: User = Depends(get_user(is_active=True)),
8990
) -> ReadRunSchema:

syncmaster/scheduler/celery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.scheduler.settings import SchedulerAppSettings
4+
from syncmaster.worker import celery_factory
5+
6+
celery = celery_factory(SchedulerAppSettings())

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
from syncmaster.backend.services.unit_of_work import UnitOfWork
1010
from syncmaster.db.models import RunType, Status, Transfer
1111
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
12+
from syncmaster.scheduler.celery import celery
1213
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1314
from syncmaster.scheduler.utils import get_async_session
1415
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
15-
from syncmaster.worker import celery
1616

1717

1818
class TransferJobManager:

syncmaster/worker/__init__.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
from celery import Celery
44

55
from syncmaster.worker.base import WorkerTask
6-
from syncmaster.worker.settings import WorkerAppSettings
76

87

9-
def create_celery_app(settings) -> Celery:
8+
def celery_factory(settings) -> Celery:
109
app = Celery(
1110
__name__,
1211
broker=settings.broker.url,
@@ -17,8 +16,3 @@ def create_celery_app(settings) -> Celery:
1716
],
1817
)
1918
return app
20-
21-
22-
# TODO: initialize celery app in __name__ == "__main__"
23-
# then initialize celery app in backend via dependency injection and initialize in scheduler
24-
celery = create_celery_app(WorkerAppSettings())

syncmaster/worker/celery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.worker import celery_factory
4+
from syncmaster.worker.settings import WorkerAppSettings
5+
6+
celery = celery_factory(WorkerAppSettings())

syncmaster/worker/transfer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from syncmaster.db.models import AuthData, Run, Status, Transfer
1717
from syncmaster.db.repositories.utils import decrypt_auth_data
1818
from syncmaster.exceptions.run import RunNotFoundError
19-
from syncmaster.worker import celery
19+
from syncmaster.worker.celery import celery
2020
from syncmaster.worker.controller import TransferController
2121
from syncmaster.worker.settings import WorkerAppSettings
2222

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from syncmaster.backend.utils.jwt import sign_jwt
2424
from syncmaster.db.models import Base
2525
from syncmaster.scheduler.settings import SchedulerAppSettings
26-
from syncmaster.worker import create_celery_app
26+
from syncmaster.worker import celery_factory
2727
from syncmaster.worker.settings import WorkerAppSettings
2828
from tests.mocks import UserTestRoles
2929
from tests.settings import TestSettings
@@ -140,7 +140,7 @@ async def client(settings: Settings) -> AsyncGenerator:
140140

141141
@pytest.fixture(scope="session", params=[{}])
142142
def celery(worker_settings: WorkerAppSettings) -> Celery:
143-
celery_app = create_celery_app(worker_settings)
143+
celery_app = celery_factory(worker_settings)
144144
return celery_app
145145

146146

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
from syncmaster.worker import celery
1+
from syncmaster.scheduler.celery import celery
22

33
celery.conf.update(imports=list(celery.conf.imports) + ["tests.test_integration.test_scheduler.test_task"])

0 commit comments

Comments
 (0)