Skip to content

Commit 7a36705

Browse files
[DOP-21799] Refactor celery initialization (#165)
1 parent 60ab590 commit 7a36705

File tree

16 files changed

+103
-74
lines changed

16 files changed

+103
-74
lines changed

docker/entrypoint_worker.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,3 @@ set -e
66

77
# exec is required to forward all signals to the main process
88
exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@"
9-

docs/conf.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,6 @@
147147
# If true, `todo` and `todoList` produce output, else they produce nothing.
148148
todo_include_todos = False
149149

150-
# set the environment variable before imports
151-
# TODO: remove after global init of WorkerAppSettings in worker/__init__.py
152-
os.environ["SYNCMASTER__ENCRYPTION__CRYPTO_KEY"] = "crypto_key"
153-
os.environ["SYNCMASTER__DATABASE__URL"] = "postgresql+asyncpg://syncmaster:changeme@db:5432/syncmaster"
154-
os.environ["SYNCMASTER__BROKER__URL"] = "amqp://guest:guest@localhost:5672/"
155150

156151
# -- Options for HTMLHelp output ------------------------------------------
157152

syncmaster/backend/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from celery import Celery
34
from fastapi import FastAPI, HTTPException
45
from fastapi.exceptions import RequestValidationError
56
from pydantic import ValidationError
@@ -20,6 +21,15 @@
2021
from syncmaster.exceptions import SyncmasterError
2122

2223

24+
def celery_factory(settings: Settings) -> Celery:
25+
app = Celery(
26+
__name__,
27+
broker=settings.broker.url,
28+
backend="db+" + settings.database.sync_url,
29+
)
30+
return app
31+
32+
2333
def application_factory(settings: Settings) -> FastAPI:
2434
application = FastAPI(
2535
title="Syncmaster",
@@ -30,6 +40,7 @@ def application_factory(settings: Settings) -> FastAPI:
3040
redoc_url=None,
3141
)
3242
application.state.settings = settings
43+
application.state.celery = celery_factory(settings)
3344
application.include_router(api_router)
3445
application.exception_handler(RequestValidationError)(validation_exception_handler)
3546
application.exception_handler(ValidationError)(validation_exception_handler)
@@ -44,6 +55,7 @@ def application_factory(settings: Settings) -> FastAPI:
4455
{
4556
Settings: lambda: settings,
4657
UnitOfWork: get_uow(session_factory, settings=settings),
58+
Celery: lambda: application.state.celery,
4759
},
4860
)
4961

syncmaster/backend/api/v1/runs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from typing import Annotated
66

77
from asgi_correlation_id import correlation_id
8+
from celery import Celery
89
from fastapi import APIRouter, Depends, Query
910
from jinja2 import Template
1011
from kombu.exceptions import KombuError
@@ -24,7 +25,6 @@
2425
ReadRunSchema,
2526
RunPageSchema,
2627
)
27-
from syncmaster.worker import celery
2828

2929
router = APIRouter(tags=["Runs"], responses=get_error_responses())
3030

@@ -84,6 +84,7 @@ async def read_run(
8484
async def start_run(
8585
create_run_data: CreateRunSchema,
8686
settings: Annotated[Settings, Depends(Stub(Settings))],
87+
celery: Annotated[Celery, Depends(Stub(Celery))],
8788
unit_of_work: UnitOfWork = Depends(UnitOfWork),
8889
current_user: User = Depends(get_user(is_active=True)),
8990
) -> ReadRunSchema:

syncmaster/scheduler/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
from syncmaster.scheduler.transfer_fetcher import TransferFetcher
4-
from syncmaster.scheduler.transfer_job_manager import TransferJobManager
3+
from celery import Celery
4+
5+
from syncmaster.scheduler.settings import SchedulerAppSettings
6+
7+
8+
def celery_factory(settings: SchedulerAppSettings) -> Celery:
9+
app = Celery(
10+
__name__,
11+
broker=settings.broker.url,
12+
backend="db+" + settings.database.sync_url,
13+
)
14+
return app

syncmaster/scheduler/celery.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.scheduler import celery_factory
4+
from syncmaster.scheduler.settings import SchedulerAppSettings
5+
6+
# Global object, since the TransferJobManager.send_job_to_celery method is static
7+
app = celery_factory(SchedulerAppSettings())

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
from syncmaster.backend.services.unit_of_work import UnitOfWork
1010
from syncmaster.db.models import RunType, Status, Transfer
1111
from syncmaster.exceptions.run import CannotConnectToTaskQueueError
12+
from syncmaster.scheduler.celery import app as celery
1213
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1314
from syncmaster.scheduler.utils import get_async_session
1415
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
15-
from syncmaster.worker import celery
1616

1717

1818
class TransferJobManager:
@@ -50,8 +50,11 @@ def update_jobs(self, transfers: list[Transfer]) -> None:
5050
@staticmethod
5151
async def send_job_to_celery(transfer_id: int) -> None:
5252
"""
53-
Do not pass additional arguments like settings,
53+
1. Do not pass additional arguments like settings,
5454
otherwise they will be serialized in jobs table.
55+
2. Instance methods are bound to specific objects and cannot be reliably serialized
56+
due to the weak reference problem. Use a static method instead, as it is not
57+
object-specific and can be serialized.
5558
"""
5659
settings = Settings()
5760

syncmaster/worker/__init__.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from syncmaster.worker.settings import WorkerAppSettings
77

88

9-
def create_celery_app(settings) -> Celery:
9+
def celery_factory(settings: WorkerAppSettings) -> Celery:
1010
app = Celery(
1111
__name__,
1212
broker=settings.broker.url,
@@ -17,8 +17,3 @@ def create_celery_app(settings) -> Celery:
1717
],
1818
)
1919
return app
20-
21-
22-
# TODO: initialize celery app in __name__ == "__main__"
23-
# then initialize celery app in backend via dependency injection and initialize in scheduler
24-
celery = create_celery_app(WorkerAppSettings())

syncmaster/worker/celery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.worker import celery_factory
4+
from syncmaster.worker.settings import WorkerAppSettings
5+
6+
app = celery_factory(WorkerAppSettings())

syncmaster/worker/transfer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from syncmaster.db.repositories.utils import decrypt_auth_data
1616
from syncmaster.exceptions.run import RunNotFoundError
1717
from syncmaster.settings.log import setup_logging
18-
from syncmaster.worker import celery
18+
from syncmaster.worker.celery import app as celery
1919
from syncmaster.worker.controller import TransferController
2020
from syncmaster.worker.settings import WorkerAppSettings
2121

0 commit comments

Comments
 (0)