Skip to content

Commit 80e780d

Browse files
author
maxim-lixakov
committed
[DOP-19992] - create celery app with provided settings
1 parent eb73767 commit 80e780d

File tree

15 files changed

+69
-61
lines changed

15 files changed

+69
-61
lines changed

syncmaster/backend/api/v1/runs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
ReadRunSchema,
2222
RunPageSchema,
2323
)
24-
from syncmaster.worker.config import celery
25-
from syncmaster.worker.settings import get_worker_settings
24+
from syncmaster.worker.settings import WorkerAppSettings
25+
from syncmaster.worker.tasks import celery
2626

2727
router = APIRouter(tags=["Runs"], responses=get_error_responses())
2828

@@ -117,7 +117,7 @@ async def start_run(
117117
type=RunType.MANUAL,
118118
)
119119

120-
log_url = Template(get_worker_settings().worker.LOG_URL_TEMPLATE).render(
120+
log_url = Template(WorkerAppSettings().worker.LOG_URL_TEMPLATE).render(
121121
run=run,
122122
correlation_id=correlation_id.get(),
123123
)

syncmaster/scheduler/transfer_fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from sqlalchemy import select
44

55
from syncmaster.db.models import Transfer
6-
from syncmaster.scheduler.settings import SchedulerSettings as Settings
6+
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
77
from syncmaster.scheduler.utils import get_async_session
88

99

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1313
from syncmaster.scheduler.utils import get_async_session
1414
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
15-
from syncmaster.worker.config import celery
15+
from syncmaster.worker.tasks import celery
1616

1717

1818
class TransferJobManager:

syncmaster/worker/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,18 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from celery import Celery
4+
5+
from syncmaster.worker.base import WorkerTask
6+
7+
8+
def create_celery_app(settings) -> Celery:
9+
app = Celery(
10+
__name__,
11+
broker=settings.broker.url,
12+
backend="db+" + settings.database.sync_url,
13+
task_cls=WorkerTask(settings=settings),
14+
imports=[
15+
"syncmaster.worker.transfer",
16+
],
17+
)
18+
return app

syncmaster/worker/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from celery import Task
44
from sqlalchemy import create_engine
55

6-
from syncmaster.worker.settings import get_worker_settings
6+
from syncmaster.worker.settings import WorkerAppSettings
77

88

99
class WorkerTask(Task):
10-
def __init__(self) -> None:
11-
self.settings = get_worker_settings()
10+
def __init__(self, settings: WorkerAppSettings) -> None:
11+
self.settings = settings
1212
self.engine = create_engine(
1313
url=self.settings.database.sync_url,
1414
)

syncmaster/worker/config.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

syncmaster/worker/settings/__init__.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
import importlib
4-
import os
5-
63
from pydantic import Field
74
from pydantic.types import ImportString
85
from pydantic_settings import BaseSettings
@@ -54,16 +51,3 @@ class WorkerAppSettings(BaseSettings):
5451
class Config:
5552
env_prefix = "SYNCMASTER__"
5653
env_nested_delimiter = "__"
57-
58-
59-
def get_worker_settings() -> WorkerAppSettings:
60-
# TODO: add to worker documentation
61-
worker_settings_path = os.environ.get("WORKER_SETTINGS", None)
62-
63-
if worker_settings_path:
64-
module_name, class_name = worker_settings_path.rsplit(".", 1)
65-
module = importlib.import_module(module_name)
66-
settings_class = getattr(module, class_name)
67-
else:
68-
settings_class = WorkerAppSettings
69-
return settings_class()

syncmaster/worker/tasks.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.worker import create_celery_app
4+
from syncmaster.worker.settings import WorkerAppSettings
5+
from syncmaster.worker.transfer import run_transfer_task
6+
7+
8+
def register_tasks(celery_app):
9+
celery_app.task(name="run_transfer_task", bind=True, track_started=True)(run_transfer_task)
10+
11+
12+
celery = create_celery_app(WorkerAppSettings())
13+
register_tasks(celery)

syncmaster/worker/transfer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,14 @@
1616
from syncmaster.db.repositories.utils import decrypt_auth_data
1717
from syncmaster.exceptions.run import RunNotFoundError
1818
from syncmaster.worker.base import WorkerTask
19-
from syncmaster.worker.config import celery
2019
from syncmaster.worker.controller import TransferController
21-
from syncmaster.worker.settings import get_worker_settings
20+
from syncmaster.worker.settings import WorkerAppSettings
2221

2322
logger = get_task_logger(__name__)
2423

25-
CORRELATION_CELERY_HEADER_ID = get_worker_settings().worker.CORRELATION_CELERY_HEADER_ID
24+
CORRELATION_CELERY_HEADER_ID = WorkerAppSettings().worker.CORRELATION_CELERY_HEADER_ID
2625

2726

28-
@celery.task(name="run_transfer_task", bind=True, track_started=True)
2927
def run_transfer_task(self: WorkerTask, run_id: int) -> None:
3028
onetl.log.setup_logging(level=logging.INFO)
3129
with Session(self.engine) as session:

tests/conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pytest
99
import pytest_asyncio
1010
from alembic.config import Config as AlembicConfig
11+
from celery import Celery
1112
from httpx import AsyncClient
1213
from sqlalchemy.ext.asyncio import (
1314
AsyncEngine,
@@ -22,6 +23,8 @@
2223
from syncmaster.backend.utils.jwt import sign_jwt
2324
from syncmaster.db.models import Base
2425
from syncmaster.scheduler.settings import SchedulerAppSettings
26+
from syncmaster.worker import create_celery_app
27+
from syncmaster.worker.settings import WorkerAppSettings
2528
from tests.mocks import UserTestRoles
2629
from tests.settings import TestSettings
2730
from tests.utils import prepare_new_database, run_async_migrations
@@ -77,6 +80,11 @@ def scheduler_settings(request: pytest.FixtureRequest) -> SchedulerAppSettings:
7780
return SchedulerAppSettings.parse_obj(request.param)
7881

7982

83+
@pytest.fixture(scope="session", params=[{}])
84+
def worker_settings(request: pytest.FixtureRequest) -> WorkerAppSettings:
85+
return WorkerAppSettings.parse_obj(request.param)
86+
87+
8088
@pytest.fixture(scope="session")
8189
def test_settings():
8290
return TestSettings()
@@ -130,6 +138,12 @@ async def client(settings: Settings) -> AsyncGenerator:
130138
logger.info("END CLIENT FIXTURE")
131139

132140

141+
@pytest.fixture(scope="session", params=[{}])
142+
def celery(worker_settings: WorkerAppSettings) -> Celery:
143+
celery_app = create_celery_app(worker_settings)
144+
return celery_app
145+
146+
133147
@pytest_asyncio.fixture
134148
async def create_connection_data(request):
135149
if hasattr(request, "param"):

0 commit comments

Comments
 (0)