File tree Expand file tree Collapse file tree 7 files changed +18
-22
lines changed
Expand file tree Collapse file tree 7 files changed +18
-22
lines changed Original file line number Diff line number Diff line change 2222 RunPageSchema ,
2323)
2424from syncmaster .worker .config import celery
25-
26- # TODO: remove global import of WorkerSettings
27- from syncmaster .worker .settings import WorkerSettings as Settings
25+ from syncmaster .worker .settings import get_worker_settings
2826
2927router = APIRouter (tags = ["Runs" ], responses = get_error_responses ())
3028
@@ -119,7 +117,7 @@ async def start_run(
119117 type = RunType .MANUAL ,
120118 )
121119
122- log_url = Template (Settings ().LOG_URL_TEMPLATE ).render (
120+ log_url = Template (get_worker_settings ().LOG_URL_TEMPLATE ).render (
123121 run = run ,
124122 correlation_id = correlation_id .get (),
125123 )
Original file line number Diff line number Diff line change @@ -13,16 +13,16 @@ class SchedulerSettings(SyncmasterSettings):
1313
1414 .. code-block:: bash
1515
16- SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT =200
16+ SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS =200
1717 """
1818
1919 TRANSFER_FETCHING_TIMEOUT_SECONDS : int = Field (
2020 180 ,
2121 description = "Timeout for fetching transfers in seconds" ,
22- alias = "SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS " ,
22+ alias = "SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS " ,
2323 )
2424 MISFIRE_GRACE_TIME_SECONDS : int = Field (
2525 300 ,
2626 description = "Grace time for misfired jobs in seconds" ,
27- alias = "SCHEDULER__MISFIRE_GRACE_TIME_SECONDS " ,
27+ alias = "SYNCMASTER__SCHEDULER__MISFIRE_GRACE_TIME_SECONDS " ,
2828 )
Original file line number Diff line number Diff line change 33from celery import Task
44from sqlalchemy import create_engine
55
6- # TODO: remove global import of WorkerSettings
7- from syncmaster .worker .settings import WorkerSettings as Settings
6+ from syncmaster .worker .settings import get_worker_settings
87
98
109class WorkerTask (Task ):
1110 def __init__ (self ) -> None :
12- self .settings = Settings ()
11+ self .settings = get_worker_settings ()
1312 self .engine = create_engine (
1413 url = self .settings .database .sync_url ,
1514 )
Original file line number Diff line number Diff line change 33from celery import Celery
44
55from syncmaster .worker .base import WorkerTask
6+ from syncmaster .worker .settings import get_worker_settings
67
7- # TODO: remove global import of WorkerSettings
8- from syncmaster .worker .settings import WorkerSettings as Settings
9-
10- worker_settings = Settings ()
8+ worker_settings = get_worker_settings ()
119celery = Celery (
1210 __name__ ,
1311 broker = worker_settings .broker .url ,
Original file line number Diff line number Diff line change 2525from syncmaster .worker .handlers .db .postgres import PostgresHandler
2626from syncmaster .worker .handlers .file .hdfs import HDFSHandler
2727from syncmaster .worker .handlers .file .s3 import S3Handler
28-
29- # TODO: remove global import of WorkerSettings
30- from syncmaster .worker .settings import WorkerSettings as Settings
28+ from syncmaster .worker .settings import WorkerSettings
3129
3230logger = logging .getLogger (__name__ )
3331
@@ -85,8 +83,8 @@ def __init__(
8583 connection_auth_data = target_auth_data ,
8684 )
8785
88- def perform_transfer (self ) -> None :
89- spark = Settings () .CREATE_SPARK_SESSION_FUNCTION (
86+ def perform_transfer (self , settings : WorkerSettings ) -> None :
87+ spark = settings .CREATE_SPARK_SESSION_FUNCTION (
9088 run = self .run ,
9189 source = self .source_handler .connection_dto ,
9290 target = self .target_handler .connection_dto ,
Original file line number Diff line number Diff line change @@ -32,3 +32,7 @@ class WorkerSettings(SyncmasterSettings):
3232 description = "Function to create Spark session for worker" ,
3333 alias = "SYNCMASTER__WORKER__CREATE_SPARK_SESSION_FUNCTION" ,
3434 )
35+
36+
37+ def get_worker_settings (settings : WorkerSettings | None = None ) -> WorkerSettings :
38+ return WorkerSettings () if settings is None else settings
Original file line number Diff line number Diff line change 1818from syncmaster .worker .base import WorkerTask
1919from syncmaster .worker .config import celery
2020from syncmaster .worker .controller import TransferController
21- from syncmaster .worker .settings import WorkerSettings as WorkerSettings
21+ from syncmaster .worker .settings import get_worker_settings
2222
2323logger = get_task_logger (__name__ )
2424
25- # TODO: remove global import of WorkerSettings
26- CORRELATION_CELERY_HEADER_ID = WorkerSettings ().CORRELATION_CELERY_HEADER_ID
25+ CORRELATION_CELERY_HEADER_ID = get_worker_settings ().CORRELATION_CELERY_HEADER_ID
2726
2827
2928@celery .task (name = "run_transfer_task" , bind = True , track_started = True )
You can’t perform that action at this time.
0 commit comments