Skip to content

Commit 63213de

Browse files
author
Ilyas Gasanov
committed
[DOP-22297] Parallel reading from JDBC sources
1 parent 709c4f5 commit 63213de

File tree

6 files changed

+46
-3
lines changed

6 files changed

+46
-3
lines changed

syncmaster/dto/transfers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

9+
from syncmaster.dto.transfers_resources import Resources
910
from syncmaster.dto.transfers_strategy import FullStrategy, IncrementalStrategy
1011

1112

@@ -19,6 +20,7 @@ class DBTransferDTO(TransferDTO):
1920
id: int
2021
table_name: str
2122
strategy: FullStrategy | IncrementalStrategy
23+
resources: Resources
2224
transformations: list[dict] | None = None
2325
options: dict | None = None
2426

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from dataclasses import dataclass
4+
5+
6+
@dataclass
7+
class Resources:
8+
max_parallel_tasks: int
9+
cpu_cores_per_task: int
10+
ram_bytes_per_task: int

syncmaster/worker/controller.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
SFTPTransferDTO,
4242
WebDAVTransferDTO,
4343
)
44+
from syncmaster.dto.transfers_resources import Resources
4445
from syncmaster.dto.transfers_strategy import Strategy
4546
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
4647
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
@@ -169,6 +170,7 @@ def __init__(
169170
transfer_id=run.transfer.id,
170171
transfer_params=run.transfer.source_params,
171172
strategy_params=run.transfer.strategy_params,
173+
resources=run.transfer.resources,
172174
transformations=run.transfer.transformations,
173175
connection_auth_data=source_auth_data,
174176
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
@@ -179,6 +181,7 @@ def __init__(
179181
transfer_id=run.transfer.id,
180182
transfer_params=run.transfer.target_params,
181183
strategy_params=run.transfer.strategy_params,
184+
resources=run.transfer.resources,
182185
transformations=run.transfer.transformations,
183186
connection_auth_data=target_auth_data,
184187
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
@@ -215,6 +218,7 @@ def get_handler(
215218
transfer_id: int,
216219
transfer_params: dict[str, Any],
217220
strategy_params: dict[str, Any],
221+
resources: dict[str, Any],
218222
transformations: list[dict],
219223
temp_dir: TemporaryDirectory,
220224
) -> Handler:
@@ -232,6 +236,7 @@ def get_handler(
232236
transfer_dto=transfer_dto(
233237
id=transfer_id,
234238
strategy=Strategy.from_dict(strategy_params),
239+
resources=Resources(**resources),
235240
transformations=transformations,
236241
**transfer_params,
237242
),

syncmaster/worker/handlers/db/base.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,27 @@ def read(self) -> DataFrame:
4444
name=hwm_name,
4545
expression=self.transfer_dto.strategy.increment_by,
4646
)
47+
if self.transfer_dto.resources.max_parallel_tasks > 1:
48+
reader_params["options"] = {
49+
"partition_column": self.transfer_dto.strategy.increment_by,
50+
"num_partitions": self.transfer_dto.resources.max_parallel_tasks,
51+
"partitioning_mode": "range" if self.hwm and self.hwm.value else "hash",
52+
}
53+
54+
elif self.transfer_dto.strategy.type == "full" and self.transfer_dto.resources.max_parallel_tasks > 1:
55+
schema_reader = DBReader(
56+
connection=self.connection,
57+
table=self.transfer_dto.table_name,
58+
columns=self._get_columns_filter_expressions(),
59+
options={"fetchsize": 0},
60+
)
61+
df = schema_reader.run()
62+
63+
reader_params["options"] = {
64+
"partition_column": self._quote_field(df.schema[0].name),
65+
"num_partitions": self.transfer_dto.resources.max_parallel_tasks,
66+
"partitioning_mode": "hash",
67+
}
4768

4869
reader = DBReader(
4970
connection=self.connection,

syncmaster/worker/handlers/db/clickhouse.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ def write(self, df: DataFrame) -> None:
4646
(col for col in normalized_df.columns if col.lower().endswith("id")),
4747
normalized_df.columns[0], # if there is no column with "id", take the first column
4848
)
49-
quoted_sort_column = f'"{sort_column}"'
50-
51-
self.transfer_dto.options["createTableOptions"] = f"ENGINE = MergeTree() ORDER BY {quoted_sort_column}"
49+
self.transfer_dto.options["createTableOptions"] = (
50+
f"ENGINE = MergeTree() ORDER BY {self._quote_field(sort_column)}"
51+
)
5252

5353
if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value:
5454
self.transfer_dto.options["if_exists"] = "append"

tests/test_integration/test_run_transfer/test_clickhouse.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ async def postgres_to_clickhouse(
4747
"table_name": f"{clickhouse_for_conftest.user}.target_table",
4848
},
4949
strategy_params=strategy,
50+
resources={
51+
"max_parallel_tasks": 2,
52+
"cpu_cores_per_task": 1,
53+
"ram_bytes_per_task": 1024**3,
54+
},
5055
transformations=transformations,
5156
queue_id=queue.id,
5257
)

0 commit comments

Comments
 (0)