Skip to content

Commit 407f230

Browse files
author
Ilyas Gasanov
committed
[DOP-22344] Add transformations for Transfers with file filtering
1 parent 6e1f1f9 commit 407f230

File tree

18 files changed

+663
-202
lines changed

18 files changed

+663
-202
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add transformations for **Transfers** with file filtering

poetry.lock

Lines changed: 396 additions & 168 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pyjwt = { version = "^2.10.1", optional = true }
5959
jinja2 = { version = "^3.1.4", optional = true }
6060
python-multipart = { version = ">=0.0.9,<0.0.21", optional = true }
6161
celery = { version = "^5.4.0", optional = true }
62-
onetl = { version = "^0.12.0", extras = ["spark"], optional = true }
62+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", rev = "develop", extras = ["spark"] }
6363
pyyaml = {version = "*", optional = true}
6464
# due to not supporting MacOS 14.x https://www.psycopg.org/psycopg3/docs/news.html#psycopg-3-1-20
6565
psycopg = { version = ">=3.1.0,<3.2.5", extras = ["binary"], optional = true }
@@ -132,7 +132,7 @@ pytest-randomly = "^3.15.0"
132132
pytest-deadfixtures = "^2.2.1"
133133
pytest-mock = "^3.14.0"
134134
pytest-lazy-fixtures = "^1.1.1"
135-
onetl = {extras = ["spark", "s3", "hdfs", "files"], version = "^0.12.0"}
135+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", rev = "develop", extras = ["spark", "s3", "hdfs", "files"] }
136136
faker = ">=33.3,<36.0"
137137
coverage = "^7.6.1"
138138
gevent = "^24.2.1"

syncmaster/schemas/v1/transfers/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@
6565
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
6666
DataframeRowsFilter,
6767
)
68+
from syncmaster.schemas.v1.transfers.transformations.file_metadata_filter import (
69+
FileMetadataFilter,
70+
)
6871
from syncmaster.schemas.v1.types import NameConstr
6972

7073
ReadTransferSchemaSource = (
@@ -165,7 +168,7 @@
165168
| None
166169
)
167170

168-
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter
171+
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter | FileMetadataFilter
169172

170173

171174
class CopyTransferSchema(BaseModel):
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Annotated, Literal
4+
5+
from pydantic import BaseModel, Field
6+
7+
from syncmaster.schemas.v1.transformation_types import FILE_METADATA_FILTER
8+
9+
10+
class BaseMetadataFilter(BaseModel):
11+
value: str
12+
13+
14+
class NameGlobFilter(BaseMetadataFilter):
15+
type: Literal["name_glob"]
16+
17+
18+
class NameRegexpFilter(BaseMetadataFilter):
19+
type: Literal["name_regexp"]
20+
21+
22+
class FileSizeMinFilter(BaseMetadataFilter):
23+
type: Literal["file_size_min"]
24+
25+
26+
class FileSizeMaxFilter(BaseMetadataFilter):
27+
type: Literal["file_size_max"]
28+
29+
30+
MetadataFilter = NameGlobFilter | NameRegexpFilter | FileSizeMinFilter | FileSizeMaxFilter
31+
32+
33+
class FileMetadataFilter(BaseModel):
34+
type: FILE_METADATA_FILTER
35+
filters: list[Annotated[MetadataFilter, Field(..., discriminator="type")]] = Field(default_factory=list)

syncmaster/schemas/v1/transformation_types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44

55
DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
66
DATAFRAME_COLUMNS_FILTER = Literal["dataframe_columns_filter"]
7+
FILE_METADATA_FILTER = Literal["file_metadata_filter"]

syncmaster/server/providers/auth/keycloak_provider.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ async def get_current_user(self, access_token: str, *args, **kwargs) -> Any:
9494
try:
9595
new_tokens = await self.refresh_access_token(refresh_token)
9696

97-
new_access_token = new_tokens.get("access_token")
98-
new_refresh_token = new_tokens.get("refresh_token")
97+
new_access_token = new_tokens["access_token"]
98+
new_refresh_token = new_tokens["refresh_token"]
9999
request.session["access_token"] = new_access_token
100100
request.session["refresh_token"] = new_refresh_token
101101

@@ -107,11 +107,15 @@ async def get_current_user(self, access_token: str, *args, **kwargs) -> Any:
107107
log.debug("Failed to refresh access token: %s", e)
108108
self.redirect_to_auth(request.url.path)
109109

110+
if not token_info:
111+
raise AuthorizationError("Invalid token payload")
112+
110113
# these names are hardcoded in keycloak:
111114
# https://github.com/keycloak/keycloak/blob/3ca3a4ad349b4d457f6829eaf2ae05f1e01408be/core/src/main/java/org/keycloak/representations/IDToken.java
115+
# TODO: make sure which fields are guaranteed
112116
user_id = token_info.get("sub")
113-
login = token_info.get("preferred_username")
114-
email = token_info.get("email")
117+
login = token_info["preferred_username"]
118+
email = token_info["email"]
115119
first_name = token_info.get("given_name")
116120
middle_name = token_info.get("middle_name")
117121
last_name = token_info.get("family_name")

syncmaster/worker/handlers/db/base.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,15 @@ def _get_rows_filter_expression(self) -> str | None:
7979
if transformation["type"] == "dataframe_rows_filter":
8080
expressions.extend(transformation["filters"])
8181

82-
if expressions:
83-
return self._make_rows_filter_expression(expressions)
84-
85-
return None
82+
return self._make_rows_filter_expression(expressions)
8683

8784
def _get_columns_filter_expressions(self) -> list[str] | None:
8885
expressions = []
8986
for transformation in self.transfer_dto.transformations:
9087
if transformation["type"] == "dataframe_columns_filter":
9188
expressions.extend(transformation["filters"])
9289

93-
if expressions:
94-
return self._make_columns_filter_expressions(expressions)
95-
96-
return None
90+
return self._make_columns_filter_expressions(expressions)
9791

9892
@staticmethod
9993
def _quote_field(field: str) -> str:

syncmaster/worker/handlers/file/base.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def write(self, df: DataFrame) -> None:
6868

6969
return writer.run(df=df)
7070

71-
def _make_rows_filter_expression(self, filters: list[dict]) -> str:
71+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
7272
expressions = []
7373
for filter in filters:
7474
field = filter["field"]
@@ -77,7 +77,7 @@ def _make_rows_filter_expression(self, filters: list[dict]) -> str:
7777

7878
expressions.append(f"{field} {op} '{value}'" if value is not None else f"{field} {op}")
7979

80-
return " AND ".join(expressions)
80+
return " AND ".join(expressions) or None
8181

8282
def _make_columns_filter_expressions(self, filters: list[dict]) -> list[str] | None:
8383
# TODO: another approach is to use df.select(col("col1"), col("col2").alias("new_col2"), ...)
@@ -103,18 +103,12 @@ def _get_rows_filter_expression(self) -> str | None:
103103
if transformation["type"] == "dataframe_rows_filter":
104104
expressions.extend(transformation["filters"])
105105

106-
if expressions:
107-
return self._make_rows_filter_expression(expressions)
108-
109-
return None
106+
return self._make_rows_filter_expression(expressions)
110107

111108
def _get_columns_filter_expressions(self) -> list[str] | None:
112109
expressions = []
113110
for transformation in self.transfer_dto.transformations:
114111
if transformation["type"] == "dataframe_columns_filter":
115112
expressions.extend(transformation["filters"])
116113

117-
if expressions:
118-
return self._make_columns_filter_expressions(expressions)
119-
120-
return None
114+
return self._make_columns_filter_expressions(expressions)

syncmaster/worker/handlers/file/protocol.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import TYPE_CHECKING
88

99
from onetl.file import FileDFReader, FileDFWriter, FileDownloader, FileUploader
10+
from onetl.file.filter import FileSizeRange, Glob, Regexp
1011

1112
from syncmaster.worker.handlers.file.base import FileHandler
1213

@@ -23,6 +24,7 @@ def read(self) -> DataFrame:
2324
connection=self.connection,
2425
source_path=self.transfer_dto.directory_path,
2526
local_path=self.temp_dir.name,
27+
filters=self._get_file_metadata_filters(),
2628
)
2729
downloader.run()
2830

@@ -65,3 +67,28 @@ def write(self, df: DataFrame) -> None:
6567
options=self.transfer_dto.options,
6668
)
6769
uploader.run()
70+
71+
def _make_file_metadata_filters(self, filters: list[dict]) -> list[Glob | Regexp | FileSizeRange]:
72+
processed_filters = []
73+
for filter in filters:
74+
filter_type = filter["type"]
75+
value = filter["value"]
76+
77+
if filter_type == "name_glob":
78+
processed_filters.append(Glob(value))
79+
elif filter_type == "name_regexp":
80+
processed_filters.append(Regexp(value))
81+
elif filter_type == "file_size_min":
82+
processed_filters.append(FileSizeRange(min=value))
83+
elif filter_type == "file_size_max":
84+
processed_filters.append(FileSizeRange(max=value))
85+
86+
return processed_filters
87+
88+
def _get_file_metadata_filters(self) -> list[Glob | Regexp | FileSizeRange]:
89+
expressions = []
90+
for transformation in self.transfer_dto.transformations:
91+
if transformation["type"] == "file_metadata_filter":
92+
expressions.extend(transformation["filters"])
93+
94+
return self._make_file_metadata_filters(expressions)

0 commit comments

Comments
 (0)