Skip to content

Commit d83ad79

Browse files
[DOP-22350] Add transformations for Transfers with dataframe column filtering (#186)
1 parent 6a47361 commit d83ad79

File tree

23 files changed

+377
-113
lines changed

23 files changed

+377
-113
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add transformations for **Transfers** with dataframe column filtering

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
S3ReadTransferTarget,
3030
)
3131
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
32+
from syncmaster.schemas.v1.transfers.transformations.dataframe_columns_filter import (
33+
DataframeColumnsFilter,
34+
)
3235
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
3336
DataframeRowsFilter,
3437
)
@@ -102,7 +105,7 @@
102105
| None
103106
)
104107

105-
TransformationSchema = DataframeRowsFilter
108+
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter
106109

107110

108111
class CopyTransferSchema(BaseModel):
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Annotated, Literal
4+
5+
from pydantic import BaseModel, Field
6+
7+
from syncmaster.schemas.v1.transformation_types import DATAFRAME_COLUMNS_FILTER
8+
9+
10+
class BaseColumnsFilter(BaseModel):
11+
field: str
12+
13+
14+
class IncludeFilter(BaseColumnsFilter):
15+
type: Literal["include"]
16+
17+
18+
class RenameFilter(BaseColumnsFilter):
19+
type: Literal["rename"]
20+
to: str
21+
22+
23+
class CastFilter(BaseColumnsFilter):
24+
type: Literal["cast"]
25+
as_type: str
26+
27+
28+
ColumnsFilter = IncludeFilter | RenameFilter | CastFilter
29+
30+
31+
class DataframeColumnsFilter(BaseModel):
32+
type: DATAFRAME_COLUMNS_FILTER
33+
filters: list[Annotated[ColumnsFilter, Field(..., discriminator="type")]] = Field(default_factory=list)

syncmaster/schemas/v1/transfers/transformations/dataframe_rows_filter.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,74 +7,74 @@
77
from syncmaster.schemas.v1.transformation_types import DATAFRAME_ROWS_FILTER
88

99

10-
class BaseRowFilter(BaseModel):
10+
class BaseRowsFilter(BaseModel):
1111
field: str
1212

1313

14-
class IsNullFilter(BaseRowFilter):
14+
class IsNullFilter(BaseRowsFilter):
1515
type: Literal["is_null"]
1616

1717

18-
class IsNotNullFilter(BaseRowFilter):
18+
class IsNotNullFilter(BaseRowsFilter):
1919
type: Literal["is_not_null"]
2020

2121

22-
class EqualFilter(BaseRowFilter):
22+
class EqualFilter(BaseRowsFilter):
2323
type: Literal["equal"]
2424
value: str
2525

2626

27-
class NotEqualFilter(BaseRowFilter):
27+
class NotEqualFilter(BaseRowsFilter):
2828
type: Literal["not_equal"]
2929
value: str
3030

3131

32-
class GreaterThanFilter(BaseRowFilter):
32+
class GreaterThanFilter(BaseRowsFilter):
3333
type: Literal["greater_than"]
3434
value: str
3535

3636

37-
class GreaterOrEqualFilter(BaseRowFilter):
37+
class GreaterOrEqualFilter(BaseRowsFilter):
3838
type: Literal["greater_or_equal"]
3939
value: str
4040

4141

42-
class LessThanFilter(BaseRowFilter):
42+
class LessThanFilter(BaseRowsFilter):
4343
type: Literal["less_than"]
4444
value: str
4545

4646

47-
class LessOrEqualFilter(BaseRowFilter):
47+
class LessOrEqualFilter(BaseRowsFilter):
4848
type: Literal["less_or_equal"]
4949
value: str
5050

5151

52-
class LikeFilter(BaseRowFilter):
52+
class LikeFilter(BaseRowsFilter):
5353
type: Literal["like"]
5454
value: str
5555

5656

57-
class ILikeFilter(BaseRowFilter):
57+
class ILikeFilter(BaseRowsFilter):
5858
type: Literal["ilike"]
5959
value: str
6060

6161

62-
class NotLikeFilter(BaseRowFilter):
62+
class NotLikeFilter(BaseRowsFilter):
6363
type: Literal["not_like"]
6464
value: str
6565

6666

67-
class NotILikeFilter(BaseRowFilter):
67+
class NotILikeFilter(BaseRowsFilter):
6868
type: Literal["not_ilike"]
6969
value: str
7070

7171

72-
class RegexpFilter(BaseRowFilter):
72+
class RegexpFilter(BaseRowsFilter):
7373
type: Literal["regexp"]
7474
value: str
7575

7676

77-
RowFilter = (
77+
RowsFilter = (
7878
IsNullFilter
7979
| IsNotNullFilter
8080
| EqualFilter
@@ -93,4 +93,4 @@ class RegexpFilter(BaseRowFilter):
9393

9494
class DataframeRowsFilter(BaseModel):
9595
type: DATAFRAME_ROWS_FILTER
96-
filters: list[Annotated[RowFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
96+
filters: list[Annotated[RowsFilter, Field(..., discriminator="type")]] = Field(default_factory=list)

syncmaster/schemas/v1/transformation_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
from typing import Literal
44

55
DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
6+
DATAFRAME_COLUMNS_FILTER = Literal["dataframe_columns_filter"]

syncmaster/worker/handlers/db/base.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ def read(self) -> DataFrame:
3838
reader = DBReader(
3939
connection=self.connection,
4040
table=self.transfer_dto.table_name,
41-
where=self._get_filter_expression(),
41+
where=self._get_rows_filter_expression(),
42+
columns=self._get_columns_filter_expressions(),
4243
)
4344
return reader.run()
4445

@@ -53,13 +54,47 @@ def write(self, df: DataFrame) -> None:
5354
def _normalize_column_names(self, df: DataFrame) -> DataFrame: ...
5455

5556
@abstractmethod
56-
def _make_filter_expression(self, filters: list[dict]) -> str | None: ...
57+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None: ...
5758

58-
def _get_filter_expression(self) -> str | None:
59-
filters = []
59+
def _make_columns_filter_expressions(self, filters: list[dict]) -> list[str] | None:
60+
expressions = []
61+
for filter in filters:
62+
filter_type = filter["type"]
63+
field = self._quote_field(filter["field"])
64+
65+
if filter_type == "include":
66+
expressions.append(field)
67+
elif filter_type == "rename":
68+
new_name = self._quote_field(filter["to"])
69+
expressions.append(f"{field} AS {new_name}")
70+
elif filter_type == "cast":
71+
cast_type = filter["as_type"]
72+
expressions.append(f"CAST({field} AS {cast_type}) AS {field}")
73+
74+
return expressions or None
75+
76+
def _get_rows_filter_expression(self) -> str | None:
77+
expressions = []
6078
for transformation in self.transfer_dto.transformations:
6179
if transformation["type"] == "dataframe_rows_filter":
62-
filters.extend(transformation["filters"])
63-
if filters:
64-
return self._make_filter_expression(filters)
80+
expressions.extend(transformation["filters"])
81+
82+
if expressions:
83+
return self._make_rows_filter_expression(expressions)
84+
6585
return None
86+
87+
def _get_columns_filter_expressions(self) -> list[str] | None:
88+
expressions = []
89+
for transformation in self.transfer_dto.transformations:
90+
if transformation["type"] == "dataframe_columns_filter":
91+
expressions.extend(transformation["filters"])
92+
93+
if expressions:
94+
return self._make_columns_filter_expressions(expressions)
95+
96+
return None
97+
98+
@staticmethod
99+
def _quote_field(field: str) -> str:
100+
return f'"{field}"'

syncmaster/worker/handlers/db/clickhouse.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
6464
df = df.withColumnRenamed(column_name, column_name.lower())
6565
return df
6666

67-
def _make_filter_expression(self, filters: list[dict]) -> str | None:
67+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
6868
expressions = []
6969
for filter in filters:
70-
field = f'"{filter["field"]}"'
70+
field = self._quote_field(filter["field"])
7171
op = self._operators[filter["type"]]
7272
value = filter.get("value")
7373

syncmaster/worker/handlers/db/hive.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4040
df = df.withColumnRenamed(column_name, column_name.lower())
4141
return df
4242

43-
def _make_filter_expression(self, filters: list[dict]) -> str | None:
43+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
4444
expressions = []
4545
for filter in filters:
4646
op = self._operators[filter["type"]]
47-
field = f"`{filter["field"]}`"
47+
field = self._quote_field(filter["field"])
4848
value = filter.get("value")
4949

5050
if value is None:
@@ -59,3 +59,7 @@ def _make_filter_expression(self, filters: list[dict]) -> str | None:
5959
expressions.append(f"{field} {op} '{value}'")
6060

6161
return " AND ".join(expressions) or None
62+
63+
@staticmethod
64+
def _quote_field(field):
65+
return f"`{field}`"

syncmaster/worker/handlers/db/mssql.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4343
df = df.withColumnRenamed(column_name, column_name.lower())
4444
return df
4545

46-
def _make_filter_expression(self, filters: list[dict]) -> str | None:
46+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
4747
expressions = []
4848
for filter in filters:
4949
op = self._operators[filter["type"]]
50-
field = f'"{filter["field"]}"'
50+
field = self._quote_field(filter["field"])
5151
value = filter.get("value")
5252

5353
if value is None:

syncmaster/worker/handlers/db/mysql.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4040
df = df.withColumnRenamed(column_name, column_name.lower())
4141
return df
4242

43-
def _make_filter_expression(self, filters: list[dict]) -> str | None:
43+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
4444
expressions = []
4545
for filter in filters:
4646
op = self._operators[filter["type"]]
47-
field = f"`{filter["field"]}`"
47+
field = self._quote_field(filter["field"])
4848
value = filter.get("value")
4949

5050
if value is None:
@@ -59,3 +59,7 @@ def _make_filter_expression(self, filters: list[dict]) -> str | None:
5959
expressions.append(f"{field} {op} '{value}'")
6060

6161
return " AND ".join(expressions) or None
62+
63+
@staticmethod
64+
def _quote_field(field: str) -> str:
65+
return f"`{field}`"

0 commit comments

Comments
 (0)