Skip to content

Commit 53b80b4

Browse files
[DOP-22348] Add transformations for Transfers with dataframe row filtering (#184)
1 parent 9b79921 commit 53b80b4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1187
-132
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add transformations for **Transfers** with dataframe row filtering

poetry.lock

Lines changed: 237 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pytest-asyncio = "^0.25.1"
131131
pytest-randomly = "^3.15.0"
132132
pytest-deadfixtures = "^2.2.1"
133133
pytest-mock = "^3.14.0"
134+
pytest-lazy-fixtures = "^1.1.1"
134135
onetl = {extras = ["spark", "s3", "hdfs"], version = "^0.12.0"}
135136
faker = "^33.3.0"
136137
coverage = "^7.6.1"

syncmaster/db/migrations/versions/2023-11-23_0007_create_transfer_table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def upgrade():
4444
sa.Column("strategy_params", sa.JSON(), nullable=False),
4545
sa.Column("source_params", sa.JSON(), nullable=False),
4646
sa.Column("target_params", sa.JSON(), nullable=False),
47+
sa.Column("transformations", sa.JSON(), nullable=False),
4748
sa.Column("is_scheduled", sa.Boolean(), nullable=False),
4849
sa.Column("schedule", sa.String(length=32), nullable=False),
4950
sa.Column("queue_id", sa.BigInteger(), nullable=False),

syncmaster/db/models/transfer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class Transfer(
4646
strategy_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
4747
source_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
4848
target_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
49+
transformations: Mapped[list[dict[str, Any]]] = mapped_column(JSON, nullable=False, default=list)
4950
is_scheduled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
5051
schedule: Mapped[str] = mapped_column(String(32), nullable=False, default="")
5152
queue_id: Mapped[int] = mapped_column(

syncmaster/db/repositories/transfer.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ async def create(
115115
source_params: dict[str, Any],
116116
target_params: dict[str, Any],
117117
strategy_params: dict[str, Any],
118+
transformations: list[dict[str, Any]],
118119
queue_id: int,
119120
is_scheduled: bool,
120121
schedule: str | None,
@@ -130,6 +131,7 @@ async def create(
130131
source_params=source_params,
131132
target_params=target_params,
132133
strategy_params=strategy_params,
134+
transformations=transformations,
133135
queue_id=queue_id,
134136
is_scheduled=is_scheduled,
135137
schedule=schedule or "",
@@ -154,20 +156,21 @@ async def update(
154156
source_params: dict[str, Any],
155157
target_params: dict[str, Any],
156158
strategy_params: dict[str, Any],
159+
transformations: list[dict[str, Any]],
157160
is_scheduled: bool | None,
158161
schedule: str | None,
159162
new_queue_id: int | None,
160163
) -> Transfer:
161164
try:
162-
for key in transfer.source_params:
163-
if key not in source_params or source_params[key] is None:
164-
source_params[key] = transfer.source_params[key]
165-
for key in transfer.target_params:
166-
if key not in target_params or target_params[key] is None:
167-
target_params[key] = transfer.target_params[key]
168-
for key in transfer.strategy_params:
169-
if key not in strategy_params or strategy_params[key] is None:
170-
strategy_params[key] = transfer.strategy_params[key]
165+
for old, new in [
166+
(transfer.source_params, source_params),
167+
(transfer.target_params, target_params),
168+
(transfer.strategy_params, strategy_params),
169+
]:
170+
for key in old:
171+
if key not in new or new[key] is None:
172+
new[key] = old[key]
173+
171174
return await self._update(
172175
Transfer.id == transfer.id,
173176
name=name or transfer.name,
@@ -179,6 +182,7 @@ async def update(
179182
target_connection_id=target_connection_id or transfer.target_connection_id,
180183
source_params=source_params,
181184
target_params=target_params,
185+
transformations=transformations or transfer.transformations,
182186
queue_id=new_queue_id or transfer.queue_id,
183187
)
184188
except IntegrityError as e:

syncmaster/dto/transfers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class TransferDTO:
1515
@dataclass
1616
class DBTransferDTO(TransferDTO):
1717
table_name: str
18+
transformations: list[dict] | None = None
1819

1920

2021
@dataclass
@@ -23,6 +24,7 @@ class FileTransferDTO(TransferDTO):
2324
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
2425
options: dict
2526
df_schema: dict | None = None
27+
transformations: list[dict] | None = None
2628

2729
_format_parsers = {
2830
"csv": CSV,

syncmaster/schemas/v1/connections/oracle.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class CreateOracleConnectionDataSchema(BaseModel):
2424
additional_params: dict = Field(default_factory=dict)
2525

2626
@model_validator(mode="before")
27-
def check_owner_id(cls, values):
27+
def validate_connection_identifiers(cls, values):
2828
sid, service_name = values.get("sid"), values.get("service_name")
2929
if sid and service_name:
3030
raise ValueError("You must specify either sid or service_name but not both")
@@ -47,7 +47,7 @@ class UpdateOracleConnectionDataSchema(BaseModel):
4747
additional_params: dict | None = Field(default_factory=dict)
4848

4949
@model_validator(mode="before")
50-
def check_owner_id(cls, values):
50+
def validate_connection_identifiers(cls, values):
5151
sid, service_name = values.get("sid"), values.get("service_name")
5252
if sid and service_name:
5353
raise ValueError("You must specify either sid or service_name but not both")

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5-
from pydantic import BaseModel, Field, model_validator
5+
from typing import Annotated
6+
7+
from pydantic import BaseModel, Field, field_validator, model_validator
68

79
from syncmaster.schemas.v1.connections.connection import ReadConnectionSchema
810
from syncmaster.schemas.v1.page import PageSchema
@@ -27,6 +29,9 @@
2729
S3ReadTransferTarget,
2830
)
2931
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
32+
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
33+
DataframeRowsFilter,
34+
)
3035
from syncmaster.schemas.v1.types import NameConstr
3136

3237
ReadTransferSchemaSource = (
@@ -97,6 +102,8 @@
97102
| None
98103
)
99104

105+
TransformationSchema = DataframeRowsFilter
106+
100107

101108
class CopyTransferSchema(BaseModel):
102109
new_group_id: int
@@ -129,6 +136,9 @@ class ReadTransferSchema(BaseModel):
129136
...,
130137
discriminator="type",
131138
)
139+
transformations: list[Annotated[TransformationSchema, Field(..., discriminator="type")]] = Field(
140+
default_factory=list,
141+
)
132142

133143
class Config:
134144
from_attributes = True
@@ -158,9 +168,12 @@ class CreateTransferSchema(BaseModel):
158168
discriminator="type",
159169
description="Incremental or archive download options",
160170
)
171+
transformations: list[
172+
Annotated[TransformationSchema, Field(None, discriminator="type", description="List of transformations")]
173+
] = Field(default_factory=list)
161174

162175
@model_validator(mode="before")
163-
def check_owner_id(cls, values):
176+
def validate_scheduling(cls, values):
164177
is_scheduled, schedule = values.get("is_scheduled"), values.get("schedule")
165178
if is_scheduled and schedule is None:
166179
# TODO make checking cron string
@@ -179,6 +192,7 @@ class UpdateTransferSchema(BaseModel):
179192
source_params: UpdateTransferSchemaSource = Field(discriminator="type", default=None)
180193
target_params: UpdateTransferSchemaTarget = Field(discriminator="type", default=None)
181194
strategy_params: FullStrategy | IncrementalStrategy | None = Field(discriminator="type", default=None)
195+
transformations: list[Annotated[TransformationSchema, Field(discriminator="type", default=None)]] = None
182196

183197

184198
class ReadFullTransferSchema(ReadTransferSchema):
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0

0 commit comments

Comments
 (0)