Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.docker
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
# Scheduler options
SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200

# HWM Store
SYNCMASTER__HWM_STORE__TYPE=horizon
SYNCMASTER__HWM_STORE__URL=http://horizon:8000
SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
SYNCMASTER__HWM_STORE__USER=admin
SYNCMASTER__HWM_STORE__PASSWORD=123UsedForTestOnly@!

# Tests-only
TEST_S3_HOST_FOR_CONFTEST=test-s3
TEST_S3_PORT_FOR_CONFTEST=9000
TEST_S3_HOST_FOR_WORKER=test-s3
Expand Down
7 changes: 7 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export SYNCMASTER__AUTH__ACCESS_TOKEN__SECRET_KEY=generate_another_random_string
# Scheduler options
export SYNCMASTER__SCHEDULER__TRANSFER_FETCHING_TIMEOUT_SECONDS=200

# HWM Store
export SYNCMASTER__HWM_STORE__TYPE=horizon
export SYNCMASTER__HWM_STORE__URL=http://localhost:8020
export SYNCMASTER__HWM_STORE__NAMESPACE=syncmaster_namespace
export SYNCMASTER__HWM_STORE__USER=admin
export SYNCMASTER__HWM_STORE__PASSWORD=123UsedForTestOnly@!

# Tests-only
export TEST_S3_HOST_FOR_CONFTEST=localhost
export TEST_S3_PORT_FOR_CONFTEST=9010
Expand Down
49 changes: 47 additions & 2 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,51 @@ services:
condition: service_completed_successfully
rabbitmq:
condition: service_healthy
profiles: [worker, scheduler, s3, oracle, hdfs, hive, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]
profiles: [worker, scheduler, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

horizon:
image: mtsrus/horizon-backend:develop
restart: unless-stopped
env_file: .env.docker
environment:
# list here usernames which should be assigned SUPERADMIN role on application start
HORIZON__ENTRYPOINT__ADMIN_USERS: admin
HORIZON__DATABASE__URL: postgresql+asyncpg://horizon:changeme@horizon-db:5432/horizon
HORIZON__AUTH__ACCESS_TOKEN__SECRET_KEY: generate_another_random_string
HORIZON__AUTH__PROVIDER: horizon.backend.providers.auth.dummy.DummyAuthProvider
HORIZON__SERVER__LOGGING__PRESET: colored
# PROMETHEUS_MULTIPROC_DIR is required for multiple workers, see:
# https://prometheus.github.io/client_python/multiprocess/
PROMETHEUS_MULTIPROC_DIR: /tmp/prometheus-metrics
# tmpfs dir is cleaned up each container restart
tmpfs:
- /tmp/prometheus-metrics
ports:
- 8020:8000
depends_on:
horizon-db:
condition: service_healthy
profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

horizon-db:
image: postgres
restart: unless-stopped
environment:
TZ: UTC
POSTGRES_DB: horizon
POSTGRES_USER: horizon
POSTGRES_PASSWORD: changeme
ports:
- 5434:5432
volumes:
- horizon_test_data:/var/lib/postgresql/data
healthcheck:
test: pg_isready
start_period: 5s
interval: 30s
timeout: 5s
retries: 3
profiles: [horizon, s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

test-postgres:
image: postgres
Expand All @@ -147,7 +191,7 @@ services:
interval: 30s
timeout: 5s
retries: 3
profiles: [s3, oracle, clickhouse, mysql, mssql, hdfs, hive, sftp, ftp, ftps, samba, webdav, all]
profiles: [s3, hdfs, hive, oracle, clickhouse, mysql, mssql, sftp, ftp, ftps, samba, webdav, all]

test-s3:
image: bitnami/minio:latest
Expand Down Expand Up @@ -366,4 +410,5 @@ services:
volumes:
postgres_test_data:
rabbitmq_test_data:
horizon_test_data:
keycloak_data:
1 change: 1 addition & 0 deletions docs/changelog/next_release/209.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement increment for transfers with file sources
259 changes: 190 additions & 69 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ apscheduler = { version = "^3.10.4", optional = true }
starlette-exporter = {version = "^0.23.0", optional = true}
itsdangerous = {version = "*", optional = true}
python-keycloak = {version = ">=4.7,<6.0", optional = true}
horizon-hwm-store = {version = ">=1.1.0", optional = true }

[tool.poetry.extras]
server = [
Expand Down Expand Up @@ -109,6 +110,7 @@ worker = [
"coloredlogs",
"python-json-logger",
"pyyaml",
"horizon-hwm-store",
]

scheduler = [
Expand Down
18 changes: 17 additions & 1 deletion syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet

from syncmaster.dto.transfers_strategy import FullStrategy, IncrementalStrategy


@dataclass
class TransferDTO:
Expand All @@ -14,14 +16,24 @@ class TransferDTO:

@dataclass
class DBTransferDTO(TransferDTO):
id: int
table_name: str
strategy: FullStrategy | IncrementalStrategy
transformations: list[dict] | None = None
options: dict | None = None

def __post_init__(self):
if self.options is None:
self.options = {}
self.options.setdefault("if_exists", "replace_entire_table")


@dataclass
class FileTransferDTO(TransferDTO):
id: int
directory_path: str
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
strategy: FullStrategy | IncrementalStrategy
options: dict
file_name_template: str | None = None
df_schema: dict | None = None
Expand All @@ -43,7 +55,7 @@ def __post_init__(self):
if isinstance(self.df_schema, str):
self.df_schema = json.loads(self.df_schema)

self.options.setdefault("if_exists", "replace_entire_directory") # TODO: use "append" for incremental strategy
self.options.setdefault("if_exists", "replace_overlapping_partitions")

def _get_file_format(self, file_format: dict) -> CSV | JSONLine | JSON | Excel | XML | ORC | Parquet:
file_type = file_format.pop("type", None)
Expand Down Expand Up @@ -87,6 +99,10 @@ class MySQLTransferDTO(DBTransferDTO):
class HiveTransferDTO(DBTransferDTO):
type: ClassVar[str] = "hive"

def __post_init__(self):
super().__post_init__()
self.options.setdefault("if_exists", "replace_overlapping_partitions")


@dataclass
class S3TransferDTO(FileTransferDTO):
Expand Down
33 changes: 33 additions & 0 deletions syncmaster/dto/transfers_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from dataclasses import dataclass


@dataclass
class Strategy:
type: str

@classmethod
def from_dict(cls, data: dict) -> Strategy:
strategy_classes = {
"full": FullStrategy,
"incremental": IncrementalStrategy,
}

strategy_type = data.get("type")
if strategy_type not in strategy_classes:
raise ValueError(f"Unknown strategy type: {strategy_type}")

Check warning on line 21 in syncmaster/dto/transfers_strategy.py

View check run for this annotation

Codecov / codecov/patch

syncmaster/dto/transfers_strategy.py#L21

Added line #L21 was not covered by tests

return strategy_classes[strategy_type](**data)


@dataclass
class FullStrategy(Strategy):
pass


@dataclass
class IncrementalStrategy(Strategy):
increment_by: str
14 changes: 12 additions & 2 deletions syncmaster/schemas/v1/transfers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,18 @@ def validate_increment_by(cls, values):
source_type = values.source_params.type
increment_by = values.strategy_params.increment_by

if source_type in FILE_CONNECTION_TYPES and increment_by != "modified_since":
raise ValueError("Field 'increment_by' must be equal to 'modified_since' for file source types")
if source_type in FILE_CONNECTION_TYPES and increment_by not in ("file_modified_since", "file_name"):
raise ValueError(
"Field 'increment_by' must be equal to 'file_modified_since' or 'file_name' for file source types",
)

return values

@model_validator(mode="after")
def validate_strategy(cls, values):

if values.source_params.type in ("s3", "hdfs") and isinstance(values.strategy_params, IncrementalStrategy):
raise ValueError("S3 and HDFS sources do not support incremental strategy for now")

return values

Expand Down
3 changes: 3 additions & 0 deletions syncmaster/schemas/v1/transfers/file/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def validate_file_name_template(cls, value: str) -> str:
if missing_keys:
raise ValueError(f"Missing required placeholders: {', '.join(missing_keys)}")

if "{run_id}" not in value and "{run_created_at}" not in value:
raise ValueError("At least one of placeholders must be present: {run_id} or {run_created_at}")

try:
value.format(index="", extension="", run_created_at="", run_id="")
except KeyError as e:
Expand Down
51 changes: 48 additions & 3 deletions syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from tempfile import TemporaryDirectory
from typing import Any

from horizon.client.auth import LoginPassword
from horizon_hwm_store import HorizonHWMStore
from onetl.strategy import IncrementalStrategy

from syncmaster.db.models import Connection, Run
from syncmaster.dto.connections import (
ClickhouseConnectionDTO,
Expand Down Expand Up @@ -36,6 +40,7 @@
SFTPTransferDTO,
WebDAVTransferDTO,
)
from syncmaster.dto.transfers_strategy import Strategy
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
from syncmaster.worker.handlers.base import Handler
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
Expand Down Expand Up @@ -139,11 +144,13 @@


class TransferController:
settings: WorkerAppSettings
source_handler: Handler
target_handler: Handler

def __init__(
self,
settings: WorkerAppSettings,
run: Run,
source_connection: Connection,
source_auth_data: dict,
Expand All @@ -152,27 +159,32 @@ def __init__(
):
self.temp_dir = TemporaryDirectory(prefix=f"syncmaster_{run.id}_")

self.settings = settings
self.run = run
self.source_handler = self.get_handler(
connection_data=source_connection.data,
run_data={"id": run.id, "created_at": run.created_at},
transfer_id=run.transfer.id,
transfer_params=run.transfer.source_params,
strategy_params=run.transfer.strategy_params,
transformations=run.transfer.transformations,
connection_auth_data=source_auth_data,
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
)
self.target_handler = self.get_handler(
connection_data=target_connection.data,
run_data={"id": run.id, "created_at": run.created_at},
transfer_id=run.transfer.id,
transfer_params=run.transfer.target_params,
strategy_params=run.transfer.strategy_params,
transformations=run.transfer.transformations,
connection_auth_data=target_auth_data,
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
)

def perform_transfer(self, settings: WorkerAppSettings) -> None:
def perform_transfer(self) -> None:
try:
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION(
run=self.run,
source=self.source_handler.connection_dto,
target=self.target_handler.connection_dto,
Expand All @@ -182,6 +194,9 @@ def perform_transfer(self, settings: WorkerAppSettings) -> None:
self.source_handler.connect(spark)
self.target_handler.connect(spark)

if self.source_handler.transfer_dto.strategy.type == "incremental":
return self._perform_incremental_transfer()

df = self.source_handler.read()
self.target_handler.write(df)
finally:
Expand All @@ -192,7 +207,9 @@ def get_handler(
connection_data: dict[str, Any],
connection_auth_data: dict,
run_data: dict[str, Any],
transfer_id: int,
transfer_params: dict[str, Any],
strategy_params: dict[str, Any],
transformations: list[dict],
temp_dir: TemporaryDirectory,
) -> Handler:
Expand All @@ -207,7 +224,35 @@ def get_handler(

return handler(
connection_dto=connection_dto(**connection_data),
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
transfer_dto=transfer_dto(
id=transfer_id,
strategy=Strategy.from_dict(strategy_params),
transformations=transformations,
**transfer_params,
),
run_dto=run_dto(**run_data),
temp_dir=temp_dir,
)

def _perform_incremental_transfer(self) -> None:
with HorizonHWMStore(
api_url=self.settings.hwm_store.url,
auth=LoginPassword(login=self.settings.hwm_store.user, password=self.settings.hwm_store.password),
namespace=self.settings.hwm_store.namespace,
).force_create_namespace() as hwm_store:

with IncrementalStrategy():
hwm_name = "_".join(
[
str(self.source_handler.transfer_dto.id),
self.source_handler.connection_dto.type,
self.source_handler.transfer_dto.directory_path,
],
)
hwm = hwm_store.get_hwm(hwm_name)

self.source_handler.hwm = hwm
self.target_handler.hwm = hwm

df = self.source_handler.read()
self.target_handler.write(df)
4 changes: 4 additions & 0 deletions syncmaster/worker/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING

from etl_entities.hwm import HWM

from syncmaster.dto.connections import ConnectionDTO
from syncmaster.dto.runs import RunDTO
from syncmaster.dto.transfers import TransferDTO
Expand All @@ -17,6 +19,8 @@


class Handler(ABC):
hwm: HWM | None = None

def __init__(
self,
connection_dto: ConnectionDTO,
Expand Down
5 changes: 5 additions & 0 deletions syncmaster/worker/handlers/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
class DBHandler(Handler):
connection: BaseDBConnection
transfer_dto: DBTransferDTO

_operators = {
"is_null": "IS NULL",
"is_not_null": "IS NOT NULL",
Expand All @@ -44,9 +45,13 @@ def read(self) -> DataFrame:
return reader.run()

def write(self, df: DataFrame) -> None:
if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value:
self.transfer_dto.options["if_exists"] = "append"

writer = DBWriter(
connection=self.connection,
table=self.transfer_dto.table_name,
options=self.transfer_dto.options,
)
return writer.run(df=self._normalize_column_names(df))

Expand Down
Loading
Loading