Skip to content

Commit d39ad6e

Browse files
author
Ilyas Gasanov
committed
[DOP-22344] Add transformations for Transfers with file filtering
1 parent 40a0857 commit d39ad6e

File tree

17 files changed

+655
-198
lines changed

17 files changed

+655
-198
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add transformations for **Transfers** with file filtering

poetry.lock

Lines changed: 396 additions & 168 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pyjwt = { version = "^2.10.1", optional = true }
5959
jinja2 = { version = "^3.1.4", optional = true }
6060
python-multipart = { version = ">=0.0.9,<0.0.21", optional = true }
6161
celery = { version = "^5.4.0", optional = true }
62-
onetl = { version = "^0.12.0", extras = ["spark"], optional = true }
62+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", rev = "develop", extras = ["spark"] }
6363
pyyaml = {version = "*", optional = true}
6464
# due to not supporting MacOS 14.x https://www.psycopg.org/psycopg3/docs/news.html#psycopg-3-1-20
6565
psycopg = { version = ">=3.1.0,<3.2.5", extras = ["binary"], optional = true }
@@ -132,7 +132,7 @@ pytest-randomly = "^3.15.0"
132132
pytest-deadfixtures = "^2.2.1"
133133
pytest-mock = "^3.14.0"
134134
pytest-lazy-fixtures = "^1.1.1"
135-
onetl = {extras = ["spark", "s3", "hdfs", "files"], version = "^0.12.0"}
135+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", rev = "develop", extras = ["spark", "s3", "hdfs", "files"] }
136136
faker = ">=33.3,<36.0"
137137
coverage = "^7.6.1"
138138
gevent = "^24.2.1"

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
6666
DataframeRowsFilter,
6767
)
68+
from syncmaster.schemas.v1.transfers.transformations.file_metadata_filter import (
69+
FileMetadataFilter,
70+
)
6871
from syncmaster.schemas.v1.types import NameConstr
6972

7073
ReadTransferSchemaSource = (
@@ -165,7 +168,7 @@
165168
| None
166169
)
167170

168-
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter
171+
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter | FileMetadataFilter
169172

170173

171174
class CopyTransferSchema(BaseModel):
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Annotated, Literal
4+
5+
from pydantic import BaseModel, Field
6+
7+
from syncmaster.schemas.v1.transformation_types import FILE_METADATA_FILTER
8+
9+
10+
class BaseMetadataFilter(BaseModel):
11+
value: str
12+
13+
14+
class NameGlobFilter(BaseMetadataFilter):
15+
type: Literal["name_glob"]
16+
17+
18+
class NameRegexpFilter(BaseMetadataFilter):
19+
type: Literal["name_regexp"]
20+
21+
22+
class FileSizeMinFilter(BaseMetadataFilter):
23+
type: Literal["file_size_min"]
24+
25+
26+
class FileSizeMaxFilter(BaseMetadataFilter):
27+
type: Literal["file_size_max"]
28+
29+
30+
MetadataFilter = NameGlobFilter | NameRegexpFilter | FileSizeMinFilter | FileSizeMaxFilter
31+
32+
33+
class FileMetadataFilter(BaseModel):
34+
type: FILE_METADATA_FILTER
35+
filters: list[Annotated[MetadataFilter, Field(..., discriminator="type")]] = Field(default_factory=list)

syncmaster/schemas/v1/transformation_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44

55
DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
66
DATAFRAME_COLUMNS_FILTER = Literal["dataframe_columns_filter"]
7+
FILE_METADATA_FILTER = Literal["file_metadata_filter"]

syncmaster/worker/handlers/db/base.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,15 @@ def _get_rows_filter_expression(self) -> str | None:
7979
if transformation["type"] == "dataframe_rows_filter":
8080
expressions.extend(transformation["filters"])
8181

82-
if expressions:
83-
return self._make_rows_filter_expression(expressions)
84-
85-
return None
82+
return self._make_rows_filter_expression(expressions)
8683

8784
def _get_columns_filter_expressions(self) -> list[str] | None:
8885
expressions = []
8986
for transformation in self.transfer_dto.transformations:
9087
if transformation["type"] == "dataframe_columns_filter":
9188
expressions.extend(transformation["filters"])
9289

93-
if expressions:
94-
return self._make_columns_filter_expressions(expressions)
95-
96-
return None
90+
return self._make_columns_filter_expressions(expressions)
9791

9892
@staticmethod
9993
def _quote_field(field: str) -> str:

syncmaster/worker/handlers/file/base.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def write(self, df: DataFrame) -> None:
6868

6969
return writer.run(df=df)
7070

71-
def _make_rows_filter_expression(self, filters: list[dict]) -> str:
71+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
7272
expressions = []
7373
for filter in filters:
7474
field = filter["field"]
@@ -77,7 +77,7 @@ def _make_rows_filter_expression(self, filters: list[dict]) -> str:
7777

7878
expressions.append(f"{field} {op} '{value}'" if value is not None else f"{field} {op}")
7979

80-
return " AND ".join(expressions)
80+
return " AND ".join(expressions) or None
8181

8282
def _make_columns_filter_expressions(self, filters: list[dict]) -> list[str] | None:
8383
# TODO: another approach is to use df.select(col("col1"), col("col2").alias("new_col2"), ...)
@@ -103,18 +103,12 @@ def _get_rows_filter_expression(self) -> str | None:
103103
if transformation["type"] == "dataframe_rows_filter":
104104
expressions.extend(transformation["filters"])
105105

106-
if expressions:
107-
return self._make_rows_filter_expression(expressions)
108-
109-
return None
106+
return self._make_rows_filter_expression(expressions)
110107

111108
def _get_columns_filter_expressions(self) -> list[str] | None:
112109
expressions = []
113110
for transformation in self.transfer_dto.transformations:
114111
if transformation["type"] == "dataframe_columns_filter":
115112
expressions.extend(transformation["filters"])
116113

117-
if expressions:
118-
return self._make_columns_filter_expressions(expressions)
119-
120-
return None
114+
return self._make_columns_filter_expressions(expressions)

syncmaster/worker/handlers/file/protocol.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import TYPE_CHECKING
88

99
from onetl.file import FileDFReader, FileDFWriter, FileDownloader, FileUploader
10+
from onetl.file.filter import FileSizeRange, Glob, Regexp
1011

1112
from syncmaster.worker.handlers.file.base import FileHandler
1213

@@ -23,6 +24,7 @@ def read(self) -> DataFrame:
2324
connection=self.connection,
2425
source_path=self.transfer_dto.directory_path,
2526
local_path=self.temp_dir.name,
27+
filters=self._get_file_metadata_filters(),
2628
)
2729
downloader.run()
2830

@@ -65,3 +67,28 @@ def write(self, df: DataFrame) -> None:
6567
options=self.transfer_dto.options,
6668
)
6769
uploader.run()
70+
71+
def _make_file_metadata_filters(self, filters: list[dict]) -> list[Glob | Regexp | FileSizeRange]:
72+
processed_filters = []
73+
for filter in filters:
74+
filter_type = filter["type"]
75+
value = filter["value"]
76+
77+
if filter_type == "name_glob":
78+
processed_filters.append(Glob(value))
79+
elif filter_type == "name_regexp":
80+
processed_filters.append(Regexp(value))
81+
elif filter_type == "file_size_min":
82+
processed_filters.append(FileSizeRange(min=value))
83+
elif filter_type == "file_size_max":
84+
processed_filters.append(FileSizeRange(max=value))
85+
86+
return processed_filters
87+
88+
def _get_file_metadata_filters(self) -> list[Glob | Regexp | FileSizeRange]:
89+
expressions = []
90+
for transformation in self.transfer_dto.transformations:
91+
if transformation["type"] == "file_metadata_filter":
92+
expressions.extend(transformation["filters"])
93+
94+
return self._make_file_metadata_filters(expressions)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
ID,PHONE_NUMBER,REGION,NUMBER,BIRTH_DATE,REGISTERED_AT,ACCOUNT_BALANCE
2+
7282,+7 (691) 449-06-79,клх Ейск,1,2011-04-26,2006-02-14T09:36:15.589307+00:00,5596.5715
3+
5716,8 (852) 546-3243,г. Карачаевск,2,2020-04-19,2011-05-30T03:18:33.454814+00:00,7161.7426
4+
815,+1-777-622-4699,South Ashleyshire,3,2014-11-20,1998-11-09T22:46:12.228133+00:00,3897.5482
5+
7497,278-262-8115,ст. Комсомольск-на-Амуре,4,2004-02-19,1991-02-08T01:34:38.319212+00:00,5475.3048
6+
7570,3857284085,клх Химки,5,1991-04-17,2012-09-24T19:44:52.342526+00:00,8683.9899
7+
6160,+7 438 458 0404,ст. Балашиха,6,2013-10-05,2018-08-07T14:15:30.500278+00:00,397.8062
8+
3355,8 (727) 522-68-10,Careyview,7,1984-09-13,1977-09-21T08:02:16.691351+00:00,9984.1109
9+
4399,8 (950) 887-6214,Meyerport,8,2000-12-28,1991-01-26T19:49:01.052404+00:00,2138.7511
10+
4438,+7 (986) 346-45-32,клх Новый Оскол,9,1976-04-05,1987-10-01T19:40:43.435484+00:00,5379.6649
11+
3554,+1-241-698-1734x68678,ст. Грозный,10,1972-04-19,2003-10-19T23:39:25.607917+00:00,7344.857
12+
4376,635.306.7174x6895,с. Северодвинск,11,2009-12-19,2014-12-10T23:57:38.978435+00:00,90.9397
13+
6686,001-439-525-7944x0894,к. Охотск,12,2007-02-20,1982-12-11T21:36:51.431771+00:00,2012.0701
14+
4530,83371734832,г. Гаврилов-Ям,13,2000-02-08,1990-09-26T14:14:24.544322+00:00,9158.2148
15+
7229,+7 (928) 455-59-50,с. Петрозаводск,14,1987-02-22,2021-03-29T21:17:08.110878+00:00,9495.4051
16+
7228,479-469-4465,New Christopher,15,1976-04-25,1976-07-21T10:25:29.538945+00:00,5459.1735
17+
4306,001-431-242-8450x83170,Port Timothyfort,16,1990-04-28,1997-12-01T03:02:08.306977+00:00,1746.0504
18+
4458,+70438690715,п. Балтийск,17,2016-09-05,2024-01-11T06:34:09.873247+00:00,8563.8989
19+
8589,828-839-2009x5127,Baileytown,18,1996-01-01,2003-11-05T22:26:28.069468+00:00,707.4749
20+
6180,401.205.7356,Karenside,19,1989-04-05,2010-12-07T06:18:41.647421+00:00,7321.6199
21+
6130,642-456-8305x73206,North Darlene,20,2001-08-10,1991-11-20T05:45:19.379241+00:00,7529.7984
22+
4874,+73613741079,ст. Тамбей,21,2019-08-17,1999-09-07T16:58:59.105933+00:00,2465.4116
23+
6662,(882)500-1941x491,ст. Пермь,22,1995-07-10,1970-04-09T18:18:03.033712+00:00,8278.2693
24+
4301,+7 (535) 260-55-27,New Christopher,23,1992-11-21,1994-12-05T13:46:34.451286+00:00,1974.0252
25+
1794,001-933-371-2198,с. Приморско-Ахтарск,24,2013-05-19,1998-10-29T17:15:46.207609+00:00,8753.7965
26+
5096,001-499-346-5978x39210,ст. Лотошино,25,1995-01-18,1993-03-29T00:11:40.463076+00:00,4226.1755
27+
5605,8 (413) 673-89-27,Wolfeville,26,1990-03-05,1994-10-20T17:38:21.039941+00:00,5931.4869
28+
9845,+7 366 768 7792,East Zachary,27,2017-05-12,2001-08-28T13:15:21.965741+00:00,8716.9687

0 commit comments

Comments
 (0)