Skip to content

Commit ff45600

Browse files
author
Ilyas Gasanov
committed
[DOP-22133] Implement increment for transfers with file sources
1 parent c3473e0 commit ff45600

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1086
-351
lines changed

.env.docker

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3838
# Scheduler options
3939
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

41+
# HWM Store
42+
SYNCMASTER__HWM_STORE__TYPE=horizon
43+
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
44+
SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
45+
SYNCMASTER__HWM_STORE__USER=admin
46+
SYNCMASTER__HWM_STORE__PASSWORD=123UsedForTestOnly@!
47+
48+
# Tests-only
4149
TEST_S3_HOST_FOR_CONFTEST=test-s3
4250
TEST_S3_PORT_FOR_CONFTEST=9000
4351
TEST_S3_HOST_FOR_WORKER=test-s3

.env.local

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
3838
# Scheduler options
3939
export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200
4040

41+
# HWM Store
42+
export SYNCMASTER__HWM_STORE__TYPE=horizon
43+
export SYNCMASTER__HWM_STORE__URL=http://localhost:8020
44+
export SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
45+
export SYNCMASTER__HWM_STORE__USER=admin
46+
export SYNCMASTER__HWM_STORE__PASSWORD=123UsedForTestOnly@!
47+
4148
# Tests-only
4249
export TEST_S3_HOST_FOR_CONFTEST=localhost
4350
export TEST_S3_PORT_FOR_CONFTEST=9010

docker-compose.test.yml

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,51 @@ services:
129129
condition: service_completed_successfully
130130
rabbitmq:
131131
condition: service_healthy
132-
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
132+
profiles: [worker, scheduler, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
133+
134+
horizon:
135+
image: mtsrus/horizon-backend:develop
136+
restart: unless-stopped
137+
env_file: .env.docker
138+
environment:
139+
# list here usernames which should be assigned SUPERADMIN role on application start
140+
HORIZON__ENTRYPOINT__ADMIN_USERS: admin
141+
HORIZON__DATABASE__URL: postgresql+asyncpg://horizon:changeme@horizon-db:5432/horizon
142+
HORIZON__AUTH__ACCESS_TOKEN__SECRET_KEY: generate_another_random_string
143+
HORIZON__AUTH__PROVIDER: horizon.backend.providers.auth.dummy.DummyAuthProvider
144+
HORIZON__SERVER__LOGGING__PRESET: colored
145+
# PROMETHEUS_MULTIPROC_DIR is required for multiple workers, see:
146+
# https://prometheus.github.io/client_python/multiprocess/
147+
PROMETHEUS_MULTIPROC_DIR: /tmp/prometheus-metrics
148+
# tmpfs dir is cleaned up each container restart
149+
tmpfs:
150+
- /tmp/prometheus-metrics
151+
ports:
152+
- 8020:8000
153+
depends_on:
154+
horizon-db:
155+
condition: service_healthy
156+
profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
157+
158+
horizon-db:
159+
image: postgres
160+
restart: unless-stopped
161+
environment:
162+
TZ: UTC
163+
POSTGRES_DB: horizon
164+
POSTGRES_USER: horizon
165+
POSTGRES_PASSWORD: changeme
166+
ports:
167+
- 5434:5432
168+
volumes:
169+
- horizon_test_data:/var/lib/postgresql/data
170+
healthcheck:
171+
test: pg_isready
172+
start_period: 5s
173+
interval: 30s
174+
timeout: 5s
175+
retries: 3
176+
profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
133177

134178
test-postgres:
135179
image: postgres
@@ -147,7 +191,7 @@ services:
147191
interval: 30s
148192
timeout: 5s
149193
retries: 3
150-
profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, webdav, all]
194+
profiles: [s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
151195

152196
test-s3:
153197
image: bitnami/minio:latest
@@ -366,4 +410,5 @@ services:
366410
volumes:
367411
postgres_test_data:
368412
rabbitmq_test_data:
413+
horizon_test_data:
369414
keycloak_data:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement increment for transfers with file sources

poetry.lock

Lines changed: 190 additions & 69 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ apscheduler = { version = "^3.10.4", optional = true }
7171
starlette-exporter = {version = "^0.23.0", optional = true}
7272
itsdangerous = {version = "*", optional = true}
7373
python-keycloak = {version = ">=4.7,<6.0", optional = true}
74+
horizon-hwm-store = {version = ">=1.0.2", optional = true }
7475

7576
[tool.poetry.extras]
7677
server = [
@@ -109,6 +110,7 @@ worker = [
109110
"coloredlogs",
110111
"python-json-logger",
111112
"pyyaml",
113+
"horizon-hwm-store",
112114
]
113115

114116
scheduler = [

syncmaster/dto/transfers.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

9+
from syncmaster.dto.transfers_strategy import FullStrategy, IncrementalStrategy
10+
911

1012
@dataclass
1113
class TransferDTO:
@@ -14,14 +16,24 @@ class TransferDTO:
1416

1517
@dataclass
1618
class DBTransferDTO(TransferDTO):
19+
id: int
1720
table_name: str
21+
strategy: FullStrategy | IncrementalStrategy
1822
transformations: list[dict] | None = None
23+
options: dict | None = None
24+
25+
def __post_init__(self):
26+
if self.options is None:
27+
self.options = {}
28+
self.options.setdefault("if_exists", "replace_entire_table")
1929

2030

2131
@dataclass
2232
class FileTransferDTO(TransferDTO):
33+
id: int
2334
directory_path: str
2435
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
36+
strategy: FullStrategy | IncrementalStrategy
2537
options: dict
2638
file_name_template: str | None = None
2739
df_schema: dict | None = None
@@ -43,7 +55,7 @@ def __post_init__(self):
4355
if isinstance(self.df_schema, str):
4456
self.df_schema = json.loads(self.df_schema)
4557

46-
self.options.setdefault("if_exists", "replace_entire_directory") # TODO: use "append" for incremental strategy
58+
self.options.setdefault("if_exists", "replace_overlapping_partitions")
4759

4860
def _get_file_format(self, file_format: dict) -> CSV | JSONLine | JSON | Excel | XML | ORC | Parquet:
4961
file_type = file_format.pop("type", None)
@@ -87,6 +99,10 @@ class MySQLTransferDTO(DBTransferDTO):
8799
class HiveTransferDTO(DBTransferDTO):
88100
type: ClassVar[str] = "hive"
89101

102+
def __post_init__(self):
103+
super().__post_init__()
104+
self.options.setdefault("if_exists", "replace_overlapping_partitions")
105+
90106

91107
@dataclass
92108
class S3TransferDTO(FileTransferDTO):
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
7+
8+
@dataclass
9+
class Strategy:
10+
type: str
11+
12+
@classmethod
13+
def from_dict(cls, data: dict) -> Strategy:
14+
strategy_classes = {
15+
"full": FullStrategy,
16+
"incremental": IncrementalStrategy,
17+
}
18+
19+
strategy_type = data.get("type")
20+
if strategy_type not in strategy_classes:
21+
raise ValueError(f"Unknown strategy type: {strategy_type}")
22+
23+
return strategy_classes[strategy_type](**data)
24+
25+
26+
@dataclass
27+
class FullStrategy(Strategy):
28+
pass
29+
30+
31+
@dataclass
32+
class IncrementalStrategy(Strategy):
33+
increment_by: str

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,18 @@ def validate_increment_by(cls, values):
255255
source_type = values.source_params.type
256256
increment_by = values.strategy_params.increment_by
257257

258-
if source_type in FILE_CONNECTION_TYPES and increment_by != "modified_since":
259-
raise ValueError("Field 'increment_by' must be equal to 'modified_since' for file source types")
258+
if source_type in FILE_CONNECTION_TYPES and increment_by not in ("file_modified_since", "file_name"):
259+
raise ValueError(
260+
"Field 'increment_by' must be equal to 'file_modified_since' or 'file_name' for file source types",
261+
)
262+
263+
return values
264+
265+
@model_validator(mode="after")
266+
def validate_strategy(cls, values):
267+
268+
if values.source_params.type in ("s3", "hdfs") and isinstance(values.strategy_params, IncrementalStrategy):
269+
raise ValueError("S3 and HDFS sources do not support incremental strategy for now")
260270

261271
return values
262272

syncmaster/schemas/v1/transfers/file/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ def validate_file_name_template(cls, value: str) -> str:
9494
if missing_keys:
9595
raise ValueError(f"Missing required placeholders: {', '.join(missing_keys)}")
9696

97+
if "{run_id}" not in value and "{run_created_at}" not in value:
98+
raise ValueError("At least one of placeholders must be present: {run_id} or {run_created_at}")
99+
97100
try:
98101
value.format(index="", extension="", run_created_at="", run_id="")
99102
except KeyError as e:

0 commit comments

Comments
 (0)