Skip to content

Commit e1ef97e

Browse files
author
maxim-lixakov
committed
[DOP-19992] - create celery app with provided settings
1 parent eb73767 commit e1ef97e

File tree

16 files changed

+83
-64
lines changed

16 files changed

+83
-64
lines changed

syncmaster/backend/api/v1/runs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
ReadRunSchema,
2222
RunPageSchema,
2323
)
24-
from syncmaster.worker.config import celery
25-
from syncmaster.worker.settings import get_worker_settings
24+
from syncmaster.worker.settings import WorkerAppSettings
25+
from syncmaster.worker.tasks import celery
2626

2727
router = APIRouter(tags=["Runs"], responses=get_error_responses())
2828

@@ -117,7 +117,7 @@ async def start_run(
117117
type=RunType.MANUAL,
118118
)
119119

120-
log_url = Template(get_worker_settings().worker.LOG_URL_TEMPLATE).render(
120+
log_url = Template(WorkerAppSettings().worker.LOG_URL_TEMPLATE).render(
121121
run=run,
122122
correlation_id=correlation_id.get(),
123123
)
Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
from pydantic import BaseModel, Field, SecretStr
3+
from pydantic import Field, SecretStr
4+
from pydantic_settings import BaseSettings
45

56

6-
class KeycloakSettings(BaseModel):
7+
class KeycloakSettings(BaseSettings):
78

89
server_url: str = Field(..., description="Keycloak server URL")
910
client_id: str = Field(..., description="Keycloak client ID")
@@ -13,10 +14,20 @@ class KeycloakSettings(BaseModel):
1314
verify_ssl: bool = Field(True, description="Verify SSL certificates")
1415
scope: str = Field("openid", description="Keycloak scope")
1516

17+
class Config:
18+
env_prefix = "SYNCMASTER__"
19+
env_nested_delimiter = "__"
20+
extra = "allow"
1621

17-
class KeycloakAuthProviderSettings(BaseModel):
22+
23+
class KeycloakAuthProviderSettings(BaseSettings):
1824
"""Settings related to Keycloak interaction."""
1925

2026
keycloak: KeycloakSettings = Field(
2127
description="Keycloak settings",
2228
)
29+
30+
class Config:
31+
env_prefix = "SYNCMASTER__"
32+
env_nested_delimiter = "__"
33+
extra = "allow"

syncmaster/scheduler/transfer_fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from sqlalchemy import select
44

55
from syncmaster.db.models import Transfer
6-
from syncmaster.scheduler.settings import SchedulerSettings as Settings
6+
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
77
from syncmaster.scheduler.utils import get_async_session
88

99

syncmaster/scheduler/transfer_job_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from syncmaster.scheduler.settings import SchedulerAppSettings as Settings
1313
from syncmaster.scheduler.utils import get_async_session
1414
from syncmaster.schemas.v1.connections.connection import ReadAuthDataSchema
15-
from syncmaster.worker.config import celery
15+
from syncmaster.worker.tasks import celery
1616

1717

1818
class TransferJobManager:

syncmaster/worker/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,18 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from celery import Celery
4+
5+
from syncmaster.worker.base import WorkerTask
6+
7+
8+
def create_celery_app(settings) -> Celery:
9+
app = Celery(
10+
__name__,
11+
broker=settings.broker.url,
12+
backend="db+" + settings.database.sync_url,
13+
task_cls=WorkerTask(settings=settings),
14+
imports=[
15+
"syncmaster.worker.transfer",
16+
],
17+
)
18+
return app

syncmaster/worker/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
from celery import Task
44
from sqlalchemy import create_engine
55

6-
from syncmaster.worker.settings import get_worker_settings
6+
from syncmaster.worker.settings import WorkerAppSettings
77

88

99
class WorkerTask(Task):
10-
def __init__(self) -> None:
11-
self.settings = get_worker_settings()
10+
def __init__(self, settings: WorkerAppSettings) -> None:
11+
self.settings = settings
1212
self.engine = create_engine(
1313
url=self.settings.database.sync_url,
1414
)

syncmaster/worker/config.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

syncmaster/worker/settings/__init__.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
import importlib
4-
import os
5-
63
from pydantic import Field
74
from pydantic.types import ImportString
85
from pydantic_settings import BaseSettings
@@ -54,16 +51,3 @@ class WorkerAppSettings(BaseSettings):
5451
class Config:
5552
env_prefix = "SYNCMASTER__"
5653
env_nested_delimiter = "__"
57-
58-
59-
def get_worker_settings() -> WorkerAppSettings:
60-
# TODO: add to worker documentation
61-
worker_settings_path = os.environ.get("WORKER_SETTINGS", None)
62-
63-
if worker_settings_path:
64-
module_name, class_name = worker_settings_path.rsplit(".", 1)
65-
module = importlib.import_module(module_name)
66-
settings_class = getattr(module, class_name)
67-
else:
68-
settings_class = WorkerAppSettings
69-
return settings_class()

syncmaster/worker/tasks.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from syncmaster.worker import create_celery_app
4+
from syncmaster.worker.settings import WorkerAppSettings
5+
from syncmaster.worker.transfer import run_transfer_task
6+
7+
8+
def register_tasks(celery_app):
9+
celery_app.task(name="run_transfer_task", bind=True, track_started=True)(run_transfer_task)
10+
11+
12+
celery = create_celery_app(WorkerAppSettings())
13+
register_tasks(celery)

syncmaster/worker/transfer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,14 @@
1616
from syncmaster.db.repositories.utils import decrypt_auth_data
1717
from syncmaster.exceptions.run import RunNotFoundError
1818
from syncmaster.worker.base import WorkerTask
19-
from syncmaster.worker.config import celery
2019
from syncmaster.worker.controller import TransferController
21-
from syncmaster.worker.settings import get_worker_settings
20+
from syncmaster.worker.settings import WorkerAppSettings
2221

2322
logger = get_task_logger(__name__)
2423

25-
CORRELATION_CELERY_HEADER_ID = get_worker_settings().worker.CORRELATION_CELERY_HEADER_ID
24+
CORRELATION_CELERY_HEADER_ID = WorkerAppSettings().worker.CORRELATION_CELERY_HEADER_ID
2625

2726

28-
@celery.task(name="run_transfer_task", bind=True, track_started=True)
2927
def run_transfer_task(self: WorkerTask, run_id: int) -> None:
3028
onetl.log.setup_logging(level=logging.INFO)
3129
with Session(self.engine) as session:

0 commit comments

Comments
 (0)