Skip to content

Commit 5c50952

Browse files
[DOP-22135] Add resources field to Transfer model (#214)
1 parent 093c4db commit 5c50952

File tree

20 files changed

+119
-14
lines changed

20 files changed

+119
-14
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `resources` field to `Transfer` model

docs/design/entities.rst

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Example:
4949
.. code-block:: json
5050
5151
{
52-
"group_id": "1",
52+
"group_id": 1,
5353
"name": "Beautiful name",
5454
"description": "What a great connection !",
5555
"type": "postgres",
@@ -93,17 +93,18 @@ Example:
9393
.. code-block:: json
9494
9595
{
96-
"group_id": "1",
97-
"queue_id": "1",
96+
"group_id": 1,
97+
"queue_id": 1,
9898
"name": "My beautiful transfer.",
9999
"description": "What a great transfer !",
100-
"is_scheduled": false,
101-
"schedule": "",
102-
"source_connection_id": "1",
103-
"target_connection_id": "2",
104-
"source_params": "{'type': 'postgres', 'table_name': 'source_table'}",
105-
"target_params": "{'type': 'postgres', 'table_name': 'target_table'}",
106-
"strategy_params": "{'type': 'full'}",
100+
"is_scheduled": true,
101+
"schedule": "0 0 * * *",
102+
"source_connection_id": 1,
103+
"target_connection_id": 2,
104+
"source_params": {"type": "postgres", "table_name": "source_table"},
105+
"target_params": {"type": "mysql", "table_name": "target_table"},
106+
"strategy_params": {"type": "full"},
107+
"resources": {"max_parallel_tasks": 2, "cpu_cores_per_task": 2, "ram_bytes_per_task": "1 GiB"}
107108
}
108109
109110
Run

syncmaster/db/migrations/versions/2023-11-23_0007_create_transfer_table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def upgrade():
4545
sa.Column("source_params", sa.JSON(), nullable=False),
4646
sa.Column("target_params", sa.JSON(), nullable=False),
4747
sa.Column("transformations", sa.JSON(), nullable=False),
48+
sa.Column("resources", sa.JSON(), nullable=False),
4849
sa.Column("is_scheduled", sa.Boolean(), nullable=False),
4950
sa.Column("schedule", sa.String(length=32), nullable=False),
5051
sa.Column("queue_id", sa.BigInteger(), nullable=False),

syncmaster/db/models/transfer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class Transfer(
4747
source_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
4848
target_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
4949
transformations: Mapped[list[dict[str, Any]]] = mapped_column(JSON, nullable=False, default=list)
50+
resources: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
5051
is_scheduled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
5152
schedule: Mapped[str] = mapped_column(String(32), nullable=False, default="")
5253
queue_id: Mapped[int] = mapped_column(

syncmaster/db/repositories/transfer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ async def create(
116116
target_params: dict[str, Any],
117117
strategy_params: dict[str, Any],
118118
transformations: list[dict[str, Any]],
119+
resources: dict[str, Any],
119120
queue_id: int,
120121
is_scheduled: bool,
121122
schedule: str | None,
@@ -132,6 +133,7 @@ async def create(
132133
target_params=target_params,
133134
strategy_params=strategy_params,
134135
transformations=transformations,
136+
resources=resources,
135137
queue_id=queue_id,
136138
is_scheduled=is_scheduled,
137139
schedule=schedule or "",
@@ -157,6 +159,7 @@ async def update(
157159
target_params: dict[str, Any],
158160
strategy_params: dict[str, Any],
159161
transformations: list[dict[str, Any]],
162+
resources: dict[str, Any],
160163
is_scheduled: bool | None,
161164
schedule: str | None,
162165
new_queue_id: int | None,
@@ -183,6 +186,7 @@ async def update(
183186
source_params=source_params,
184187
target_params=target_params,
185188
transformations=transformations or transfer.transformations,
189+
resources=resources or transfer.resources,
186190
queue_id=new_queue_id or transfer.queue_id,
187191
)
188192
except IntegrityError as e:

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
WebDAVReadTransferSource,
6060
WebDAVReadTransferTarget,
6161
)
62+
from syncmaster.schemas.v1.transfers.resources import Resources
6263
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
6364
from syncmaster.schemas.v1.transfers.transformations.dataframe_columns_filter import (
6465
DataframeColumnsFilter,
@@ -206,6 +207,9 @@ class ReadTransferSchema(BaseModel):
206207
transformations: list[Annotated[TransformationSchema, Field(..., discriminator="type")]] = Field(
207208
default_factory=list,
208209
)
210+
resources: Resources = Field(
211+
...,
212+
)
209213

210214
class Config:
211215
from_attributes = True
@@ -238,6 +242,10 @@ class CreateTransferSchema(BaseModel):
238242
transformations: list[
239243
Annotated[TransformationSchema, Field(None, discriminator="type", description="List of transformations")]
240244
] = Field(default_factory=list)
245+
resources: Resources = Field(
246+
default_factory=Resources,
247+
description="Transfer resources",
248+
)
241249

242250
@model_validator(mode="before")
243251
def validate_scheduling(cls, values):
@@ -283,6 +291,7 @@ class UpdateTransferSchema(BaseModel):
283291
target_params: UpdateTransferSchemaTarget = Field(discriminator="type", default=None)
284292
strategy_params: FullStrategy | IncrementalStrategy | None = Field(discriminator="type", default=None)
285293
transformations: list[Annotated[TransformationSchema, Field(discriminator="type", default=None)]] = None
294+
resources: Resources | None = Field(default=None)
286295

287296

288297
class ReadFullTransferSchema(ReadTransferSchema):
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from pydantic import BaseModel, ByteSize, Field
4+
5+
6+
class Resources(BaseModel):
7+
max_parallel_tasks: int = Field(1, ge=1, le=100, description="Parallel executors")
8+
cpu_cores_per_task: int = Field(1, ge=1, le=32, description="Cores per executor")
9+
ram_bytes_per_task: ByteSize = Field(
10+
1024**3,
11+
ge=512 * 1024**2,
12+
le=64 * 1024**3,
13+
description="RAM per executor",
14+
)

syncmaster/schemas/v1/transfers/strategy.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3-
from __future__ import annotations
4-
53
from pydantic import BaseModel
64

75
from syncmaster.schemas.v1.transfer_types import FULL_TYPE, INCREMENTAL_TYPE

syncmaster/server/api/v1/transfers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ async def create_transfer(
131131
target_params=transfer_data.target_params.dict(),
132132
strategy_params=transfer_data.strategy_params.dict(),
133133
transformations=[tr.dict() for tr in transfer_data.transformations],
134+
resources=transfer_data.resources.dict(),
134135
queue_id=transfer_data.queue_id,
135136
is_scheduled=transfer_data.is_scheduled,
136137
schedule=transfer_data.schedule,
@@ -330,6 +331,7 @@ async def update_transfer(
330331
transformations=(
331332
[tr.dict() for tr in transfer_data.transformations] if transfer_data.transformations else []
332333
),
334+
resources=transfer_data.resources.dict() if transfer_data.resources else {},
333335
is_scheduled=transfer_data.is_scheduled,
334336
schedule=transfer_data.schedule,
335337
new_queue_id=transfer_data.new_queue_id,

syncmaster/worker/spark.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def get_worker_spark_session(
2525
name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336
2626
spark_builder = SparkSession.builder.appName(f"syncmaster_{name}")
2727

28-
for k, v in get_spark_session_conf(source, target).items():
28+
for k, v in get_spark_session_conf(source, target, run.transfer.resources).items():
2929
spark_builder = spark_builder.config(k, v)
3030

3131
if source.type == "hive" or target.type == "hive": # type: ignore
@@ -81,6 +81,7 @@ def get_excluded_packages(db_type: str) -> list[str]:
8181
def get_spark_session_conf(
8282
source: ConnectionDTO,
8383
target: ConnectionDTO,
84+
resources: dict,
8485
) -> dict:
8586
maven_packages: list[str] = []
8687
excluded_packages: list[str] = []
@@ -93,6 +94,9 @@ def get_spark_session_conf(
9394
"spark.jars.packages": ",".join(maven_packages),
9495
"spark.sql.pyspark.jvmStacktrace.enabled": "true",
9596
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "false",
97+
"spark.executor.cores": resources["cpu_cores_per_task"],
98+
"spark.executor.memory": resources["ram_bytes_per_task"],
99+
"spark.executor.instances": resources["max_parallel_tasks"],
96100
}
97101

98102
if maven_packages:

0 commit comments

Comments
 (0)