Skip to content

Commit 93b3b98

Browse files
author
Ilyas Gasanov
committed
[DOP-22133] Implement increment for transfers with file sources
1 parent c3473e0 commit 93b3b98

File tree

38 files changed

+815
-282
lines changed

38 files changed

+815
-282
lines changed

.env.docker

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3838
# Scheduler options
3939
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

41+
# Horizon
42+
SYNCMASTER__HORIZON__URL=http://horizon:8000
43+
SYNCMASTER__HORIZON__NAMESPACE=syncmaster_namespace
44+
SYNCMASTER__HORIZON__USER=admin
45+
SYNCMASTER__HORIZON__PASSWORD=password
46+
47+
# Tests-only
4148
TEST_S3_HOST_FOR_CONFTEST=test-s3
4249
TEST_S3_PORT_FOR_CONFTEST=9000
4350
TEST_S3_HOST_FOR_WORKER=test-s3

docker-compose.test.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,49 @@ services:
131131
condition: service_healthy
132132
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
133133

134+
horizon:
135+
image: mtsrus/horizon-backend:develop
136+
restart: unless-stopped
137+
env_file: .env.docker
138+
environment:
139+
# list here usernames which should be assigned SUPERADMIN role on application start
140+
HORIZON__ENTRYPOINT__ADMIN_USERS: admin
141+
HORIZON__DATABASE__URL: postgresql+asyncpg://horizon:changeme@horizon-db:5432/horizon
142+
HORIZON__AUTH__ACCESS_TOKEN__SECRET_KEY: generate_another_random_string
143+
HORIZON__AUTH__PROVIDER: horizon.backend.providers.auth.dummy.DummyAuthProvider
144+
HORIZON__SERVER__LOGGING__PRESET: colored
145+
# PROMETHEUS_MULTIPROC_DIR is required for multiple workers, see:
146+
# https://prometheus.github.io/client_python/multiprocess/
147+
PROMETHEUS_MULTIPROC_DIR: /tmp/prometheus-metrics
148+
# tmpfs dir is cleaned up each container restart
149+
tmpfs:
150+
- /tmp/prometheus-metrics
151+
ports:
152+
- 8020:8000
153+
depends_on:
154+
horizon-db:
155+
condition: service_healthy
156+
profiles: [horizon, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
157+
158+
horizon-db:
159+
image: postgres
160+
restart: unless-stopped
161+
environment:
162+
TZ: UTC
163+
POSTGRES_DB: horizon
164+
POSTGRES_USER: horizon
165+
POSTGRES_PASSWORD: changeme
166+
ports:
167+
- 5434:5432
168+
volumes:
169+
- horizon_test_data:/var/lib/postgresql/data
170+
healthcheck:
171+
test: pg_isready
172+
start_period: 5s
173+
interval: 30s
174+
timeout: 5s
175+
retries: 3
176+
134177
test-postgres:
135178
image: postgres
136179
restart: unless-stopped
@@ -366,4 +409,5 @@ services:
366409
volumes:
367410
postgres_test_data:
368411
rabbitmq_test_data:
412+
horizon_test_data:
369413
keycloak_data:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement increment for transfers with file sources

poetry.lock

Lines changed: 190 additions & 69 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ apscheduler = { version = "^3.10.4", optional = true }
7171
starlette-exporter = {version = "^0.23.0", optional = true}
7272
itsdangerous = {version = "*", optional = true}
7373
python-keycloak = {version = ">=4.7,<6.0", optional = true}
74+
horizon-hwm-store = {version = ">=1.0.2", optional = true }
7475

7576
[tool.poetry.extras]
7677
server = [
@@ -109,6 +110,7 @@ worker = [
109110
"coloredlogs",
110111
"python-json-logger",
111112
"pyyaml",
113+
"horizon-hwm-store",
112114
]
113115

114116
scheduler = [

syncmaster/dto/transfers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

9+
from syncmaster.dto.transfers_strategy import FullStrategy, IncrementalStrategy
10+
911

1012
@dataclass
1113
class TransferDTO:
@@ -14,14 +16,18 @@ class TransferDTO:
1416

1517
@dataclass
1618
class DBTransferDTO(TransferDTO):
19+
id: int
1720
table_name: str
21+
strategy: FullStrategy | IncrementalStrategy
1822
transformations: list[dict] | None = None
1923

2024

2125
@dataclass
2226
class FileTransferDTO(TransferDTO):
27+
id: int
2328
directory_path: str
2429
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
30+
strategy: FullStrategy | IncrementalStrategy
2531
options: dict
2632
file_name_template: str | None = None
2733
df_schema: dict | None = None
@@ -43,7 +49,7 @@ def __post_init__(self):
4349
if isinstance(self.df_schema, str):
4450
self.df_schema = json.loads(self.df_schema)
4551

46-
self.options.setdefault("if_exists", "replace_entire_directory") # TODO: use "append" for incremental strategy
52+
self.options.setdefault("if_exists", "replace_entire_directory")
4753

4854
def _get_file_format(self, file_format: dict) -> CSV | JSONLine | JSON | Excel | XML | ORC | Parquet:
4955
file_type = file_format.pop("type", None)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
7+
8+
@dataclass
9+
class Strategy:
10+
type: str
11+
12+
@classmethod
13+
def from_dict(cls, data: dict) -> Strategy:
14+
strategy_classes = {
15+
"full": FullStrategy,
16+
"incremental": IncrementalStrategy,
17+
}
18+
19+
strategy_type = data.get("type")
20+
if strategy_type not in strategy_classes:
21+
raise ValueError(f"Unknown strategy type: {strategy_type}")
22+
23+
return strategy_classes[strategy_type](**data)
24+
25+
26+
@dataclass
27+
class FullStrategy(Strategy):
28+
pass
29+
30+
31+
@dataclass
32+
class IncrementalStrategy(Strategy):
33+
increment_by: str

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,10 @@ def validate_increment_by(cls, values):
255255
source_type = values.source_params.type
256256
increment_by = values.strategy_params.increment_by
257257

258-
if source_type in FILE_CONNECTION_TYPES and increment_by != "modified_since":
259-
raise ValueError("Field 'increment_by' must be equal to 'modified_since' for file source types")
258+
if source_type in FILE_CONNECTION_TYPES and increment_by not in ("modified_since", "file_name"):
259+
raise ValueError(
260+
"Field 'increment_by' must be equal to 'modified_since' or 'file_name' for file source types",
261+
)
260262

261263
return values
262264

syncmaster/worker/controller.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@
44
from tempfile import TemporaryDirectory
55
from typing import Any
66

7+
from horizon.client.auth import LoginPassword
8+
from horizon.client.sync import HorizonClientSync
9+
from horizon.commons.schemas.v1 import (
10+
NamespaceCreateRequestV1,
11+
NamespacePaginateQueryV1,
12+
)
13+
from horizon_hwm_store import HorizonHWMStore
14+
from onetl.strategy import IncrementalStrategy
15+
716
from syncmaster.db.models import Connection, Run
817
from syncmaster.dto.connections import (
918
ClickhouseConnectionDTO,
@@ -36,6 +45,7 @@
3645
SFTPTransferDTO,
3746
WebDAVTransferDTO,
3847
)
48+
from syncmaster.dto.transfers_strategy import Strategy
3949
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
4050
from syncmaster.worker.handlers.base import Handler
4151
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
@@ -139,11 +149,13 @@
139149

140150

141151
class TransferController:
152+
settings: WorkerAppSettings
142153
source_handler: Handler
143154
target_handler: Handler
144155

145156
def __init__(
146157
self,
158+
settings: WorkerAppSettings,
147159
run: Run,
148160
source_connection: Connection,
149161
source_auth_data: dict,
@@ -152,27 +164,32 @@ def __init__(
152164
):
153165
self.temp_dir = TemporaryDirectory(prefix=f"syncmaster_{run.id}_")
154166

167+
self.settings = settings
155168
self.run = run
156169
self.source_handler = self.get_handler(
157170
connection_data=source_connection.data,
158171
run_data={"id": run.id, "created_at": run.created_at},
172+
transfer_id=run.transfer.id,
159173
transfer_params=run.transfer.source_params,
174+
strategy_params=run.transfer.strategy_params,
160175
transformations=run.transfer.transformations,
161176
connection_auth_data=source_auth_data,
162177
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
163178
)
164179
self.target_handler = self.get_handler(
165180
connection_data=target_connection.data,
166181
run_data={"id": run.id, "created_at": run.created_at},
182+
transfer_id=run.transfer.id,
167183
transfer_params=run.transfer.target_params,
184+
strategy_params=run.transfer.strategy_params,
168185
transformations=run.transfer.transformations,
169186
connection_auth_data=target_auth_data,
170187
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
171188
)
172189

173-
def perform_transfer(self, settings: WorkerAppSettings) -> None:
190+
def perform_transfer(self) -> None:
174191
try:
175-
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
192+
spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION(
176193
run=self.run,
177194
source=self.source_handler.connection_dto,
178195
target=self.target_handler.connection_dto,
@@ -182,6 +199,9 @@ def perform_transfer(self, settings: WorkerAppSettings) -> None:
182199
self.source_handler.connect(spark)
183200
self.target_handler.connect(spark)
184201

202+
if self.source_handler.transfer_dto.strategy.type == "incremental":
203+
return self._perform_incremental_transfer()
204+
185205
df = self.source_handler.read()
186206
self.target_handler.write(df)
187207
finally:
@@ -192,7 +212,9 @@ def get_handler(
192212
connection_data: dict[str, Any],
193213
connection_auth_data: dict,
194214
run_data: dict[str, Any],
215+
transfer_id: int,
195216
transfer_params: dict[str, Any],
217+
strategy_params: dict[str, Any],
196218
transformations: list[dict],
197219
temp_dir: TemporaryDirectory,
198220
) -> Handler:
@@ -207,7 +229,48 @@ def get_handler(
207229

208230
return handler(
209231
connection_dto=connection_dto(**connection_data),
210-
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
232+
transfer_dto=transfer_dto(
233+
id=transfer_id,
234+
strategy=Strategy.from_dict(strategy_params),
235+
transformations=transformations,
236+
**transfer_params,
237+
),
211238
run_dto=run_dto(**run_data),
212239
temp_dir=temp_dir,
213240
)
241+
242+
def _perform_incremental_transfer(self) -> None:
243+
self._ensure_horizon_namespace_exists()
244+
245+
with HorizonHWMStore(
246+
api_url=self.settings.horizon.url,
247+
auth=LoginPassword(login=self.settings.horizon.user, password=self.settings.horizon.password),
248+
namespace=self.settings.horizon.namespace,
249+
) as hwm_store:
250+
with IncrementalStrategy():
251+
hwm_name = "_".join(
252+
[
253+
str(self.source_handler.transfer_dto.id),
254+
self.source_handler.connection_dto.type,
255+
self.source_handler.transfer_dto.directory_path,
256+
],
257+
)
258+
hwm = hwm_store.get_hwm(hwm_name)
259+
260+
# S3 & HDFS sources do not currently support incremental reading
261+
if not isinstance(self.source_handler, (S3Handler, HDFSHandler)):
262+
self.source_handler.hwm = hwm
263+
self.target_handler.hwm = hwm
264+
265+
df = self.source_handler.read()
266+
self.target_handler.write(df)
267+
268+
def _ensure_horizon_namespace_exists(self) -> None:
269+
client = HorizonClientSync(
270+
base_url=self.settings.horizon.url,
271+
auth=LoginPassword(login=self.settings.horizon.user, password=self.settings.horizon.password),
272+
)
273+
client.authorize()
274+
namespace_query = NamespacePaginateQueryV1(name=self.settings.horizon.namespace)
275+
if not client.paginate_namespaces(query=namespace_query).items:
276+
client.create_namespace(NamespaceCreateRequestV1(name=self.settings.horizon.namespace))

syncmaster/worker/handlers/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from tempfile import TemporaryDirectory
88
from typing import TYPE_CHECKING
99

10+
from etl_entities.hwm import HWM
11+
1012
from syncmaster.dto.connections import ConnectionDTO
1113
from syncmaster.dto.runs import RunDTO
1214
from syncmaster.dto.transfers import TransferDTO
@@ -17,6 +19,8 @@
1719

1820

1921
class Handler(ABC):
22+
hwm: HWM | None = None
23+
2024
def __init__(
2125
self,
2226
connection_dto: ConnectionDTO,

0 commit comments

Comments
 (0)