Skip to content

Commit 9d1e591

Browse files
[DOP-22146] Set the names of saved files (#201)
1 parent 98f006c commit 9d1e591

File tree

26 files changed

+319
-47
lines changed

26 files changed

+319
-47
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Use the `file_name_template` field to specify the names of saved files

syncmaster/dto/runs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
6+
7+
@dataclass
8+
class RunDTO:
9+
id: str
10+
created_at: datetime

syncmaster/dto/transfers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class FileTransferDTO(TransferDTO):
2323
directory_path: str
2424
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
2525
options: dict
26+
file_name_template: str | None = None
2627
df_schema: dict | None = None
2728
transformations: list[dict] | None = None
2829

syncmaster/schemas/v1/connections/hdfs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class ReadHDFSConnectionDataSchema(BaseModel):
2525

2626

2727
class UpdateHDFSConnectionDataSchema(BaseModel):
28-
cluster: str
28+
cluster: str | None = None
2929

3030

3131
class CreateHDFSConnectionSchema(CreateConnectionBaseSchema):

syncmaster/schemas/v1/transfers/file/base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
import re
56
from pathlib import PurePosixPath
6-
from typing import Any
7+
from typing import Any, ClassVar
78

89
from pydantic import BaseModel, Field, field_validator
910

@@ -63,11 +64,13 @@ class CreateFileTransferTarget(BaseModel):
6364
discriminator="type",
6465
)
6566
file_name_template: str = Field(
66-
default="{run_created_at}_{index}.{extension}",
67+
default="{run_created_at}-{index}.{extension}",
6768
description="Template for file naming with required placeholders 'index' and 'extension'",
6869
)
6970
options: dict[str, Any] = Field(default_factory=dict)
7071

72+
FILE_NAME_PATTERN: ClassVar[re.Pattern] = re.compile(r"^[a-zA-Z0-9_.{}-]+$")
73+
7174
class Config:
7275
arbitrary_types_allowed = True
7376

@@ -80,7 +83,10 @@ def _directory_path_is_valid_path(cls, value):
8083

8184
@field_validator("file_name_template")
8285
@classmethod
83-
def validate_file_name_template(cls, value):
86+
def validate_file_name_template(cls, value: str) -> str:
87+
if not cls.FILE_NAME_PATTERN.match(value):
88+
raise ValueError("Template contains invalid characters. Allowed: letters, numbers, '.', '_', '-', '{', '}'")
89+
8490
required_keys = {"index", "extension"}
8591
placeholders = {key for key in required_keys if f"{{{key}}}" in value}
8692

syncmaster/worker/controller.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
SFTPConnectionDTO,
2121
WebDAVConnectionDTO,
2222
)
23+
from syncmaster.dto.runs import RunDTO
2324
from syncmaster.dto.transfers import (
2425
ClickhouseTransferDTO,
2526
FTPSTransferDTO,
@@ -60,66 +61,79 @@
6061
HiveHandler,
6162
HiveConnectionDTO,
6263
HiveTransferDTO,
64+
RunDTO,
6365
),
6466
"oracle": (
6567
OracleHandler,
6668
OracleConnectionDTO,
6769
OracleTransferDTO,
70+
RunDTO,
6871
),
6972
"clickhouse": (
7073
ClickhouseHandler,
7174
ClickhouseConnectionDTO,
7275
ClickhouseTransferDTO,
76+
RunDTO,
7377
),
7478
"mssql": (
7579
MSSQLHandler,
7680
MSSQLConnectionDTO,
7781
MSSQLTransferDTO,
82+
RunDTO,
7883
),
7984
"mysql": (
8085
MySQLHandler,
8186
MySQLConnectionDTO,
8287
MySQLTransferDTO,
88+
RunDTO,
8389
),
8490
"postgres": (
8591
PostgresHandler,
8692
PostgresConnectionDTO,
8793
PostgresTransferDTO,
94+
RunDTO,
8895
),
8996
"s3": (
9097
S3Handler,
9198
S3ConnectionDTO,
9299
S3TransferDTO,
100+
RunDTO,
93101
),
94102
"hdfs": (
95103
HDFSHandler,
96104
HDFSConnectionDTO,
97105
HDFSTransferDTO,
106+
RunDTO,
98107
),
99108
"sftp": (
100109
SFTPHandler,
101110
SFTPConnectionDTO,
102111
SFTPTransferDTO,
112+
RunDTO,
103113
),
104114
"ftp": (
105115
FTPHandler,
106116
FTPConnectionDTO,
107117
FTPTransferDTO,
118+
RunDTO,
108119
),
109120
"ftps": (
110121
FTPSHandler,
111122
FTPSConnectionDTO,
112123
FTPSTransferDTO,
124+
RunDTO,
113125
),
114126
"samba": (
115127
SambaHandler,
116128
SambaConnectionDTO,
117129
SambaTransferDTO,
130+
RunDTO,
118131
),
119132
"webdav": (
120133
WebDAVHandler,
121134
WebDAVConnectionDTO,
122135
WebDAVTransferDTO,
136+
RunDTO,
123137
),
124138
}
125139

@@ -141,13 +155,15 @@ def __init__(
141155
self.run = run
142156
self.source_handler = self.get_handler(
143157
connection_data=source_connection.data,
158+
run_data={"id": run.id, "created_at": run.created_at},
144159
transfer_params=run.transfer.source_params,
145160
transformations=run.transfer.transformations,
146161
connection_auth_data=source_auth_data,
147162
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
148163
)
149164
self.target_handler = self.get_handler(
150165
connection_data=target_connection.data,
166+
run_data={"id": run.id, "created_at": run.created_at},
151167
transfer_params=run.transfer.target_params,
152168
transformations=run.transfer.transformations,
153169
connection_auth_data=target_auth_data,
@@ -175,6 +191,7 @@ def get_handler(
175191
self,
176192
connection_data: dict[str, Any],
177193
connection_auth_data: dict,
194+
run_data: dict[str, Any],
178195
transfer_params: dict[str, Any],
179196
transformations: list[dict],
180197
temp_dir: TemporaryDirectory,
@@ -186,10 +203,11 @@ def get_handler(
186203
if connection_handler_proxy.get(handler_type, None) is None:
187204
raise ConnectionTypeNotRecognizedError
188205

189-
handler, connection_dto, transfer_dto = connection_handler_proxy[handler_type]
206+
handler, connection_dto, transfer_dto, run_dto = connection_handler_proxy[handler_type]
190207

191208
return handler(
192209
connection_dto=connection_dto(**connection_data),
193210
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
211+
run_dto=run_dto(**run_data),
194212
temp_dir=temp_dir,
195213
)

syncmaster/worker/handlers/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import TYPE_CHECKING
99

1010
from syncmaster.dto.connections import ConnectionDTO
11+
from syncmaster.dto.runs import RunDTO
1112
from syncmaster.dto.transfers import TransferDTO
1213

1314
if TYPE_CHECKING:
@@ -20,10 +21,12 @@ def __init__(
2021
self,
2122
connection_dto: ConnectionDTO,
2223
transfer_dto: TransferDTO,
24+
run_dto: RunDTO,
2325
temp_dir: TemporaryDirectory,
2426
):
2527
self.connection_dto = connection_dto
2628
self.transfer_dto = transfer_dto
29+
self.run_dto = run_dto
2730
self.temp_dir = temp_dir
2831

2932
@abstractmethod

syncmaster/worker/handlers/file/base.py

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33

44
from __future__ import annotations
55

6+
import os
67
from typing import TYPE_CHECKING
78

89
from onetl.base.base_file_df_connection import BaseFileDFConnection
9-
from onetl.file import FileDFReader, FileDFWriter
10+
from onetl.file import FileDFReader, FileDFWriter, FileMover
11+
from onetl.file.filter import Glob
1012

1113
from syncmaster.dto.connections import ConnectionDTO
1214
from syncmaster.dto.transfers import FileTransferDTO
@@ -15,9 +17,18 @@
1517
if TYPE_CHECKING:
1618
from pyspark.sql.dataframe import DataFrame
1719

20+
COLUMN_FORMATS = ("parquet", "orc")
21+
1822

1923
class FileHandler(Handler):
20-
connection: BaseFileDFConnection
24+
"""
25+
TODO: FileHandler is actually handler for FileDFWriter with remote FS (direct write).
26+
FileProtocolHandler is handler for FileDFWriter with local FS (write via upload).
27+
Maybe we should keep here only common methods,
28+
like file name generator and split other ones to classes where the method is really used.
29+
"""
30+
31+
df_connection: BaseFileDFConnection
2132
connection_dto: ConnectionDTO
2233
transfer_dto: FileTransferDTO
2334
_operators = {
@@ -35,12 +46,29 @@ class FileHandler(Handler):
3546
"not_ilike": "NOT ILIKE",
3647
"regexp": "RLIKE",
3748
}
49+
_compression_to_file_suffix = {
50+
"gzip": "gz",
51+
"snappy": "snappy",
52+
"zlib": "zlib",
53+
"lz4": "lz4",
54+
"bzip2": "bz2",
55+
"deflate": "deflate",
56+
}
57+
_file_format_to_file_suffix = {
58+
"json": "json",
59+
"jsonline": "jsonl",
60+
"csv": "csv",
61+
"xml": "xml",
62+
"excel": "xlsx",
63+
"parquet": "parquet",
64+
"orc": "orc",
65+
}
3866

3967
def read(self) -> DataFrame:
4068
from pyspark.sql.types import StructType
4169

4270
reader = FileDFReader(
43-
connection=self.connection,
71+
connection=self.df_connection,
4472
format=self.transfer_dto.file_format,
4573
source_path=self.transfer_dto.directory_path,
4674
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
@@ -59,14 +87,65 @@ def read(self) -> DataFrame:
5987
return df
6088

6189
def write(self, df: DataFrame) -> None:
62-
writer = FileDFWriter(
63-
connection=self.connection,
64-
format=self.transfer_dto.file_format,
65-
target_path=self.transfer_dto.directory_path,
66-
options=self.transfer_dto.options,
90+
tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id))
91+
try:
92+
writer = FileDFWriter(
93+
connection=self.df_connection,
94+
format=self.transfer_dto.file_format,
95+
target_path=tmp_path,
96+
options=self.transfer_dto.options,
97+
)
98+
writer.run(df=df)
99+
100+
self._rename_files(tmp_path)
101+
102+
mover = FileMover(
103+
connection=self.file_connection,
104+
source_path=tmp_path,
105+
target_path=self.transfer_dto.directory_path,
106+
# ignore .crc and other metadata files
107+
filters=[Glob(f"*.{self._get_file_extension()}")],
108+
)
109+
mover.run()
110+
finally:
111+
self.file_connection.remove_dir(tmp_path, recursive=True)
112+
113+
def _rename_files(self, tmp_path: str) -> None:
114+
files = self.file_connection.list_dir(tmp_path)
115+
116+
for index, file_name in enumerate(files):
117+
extension = self._get_file_extension()
118+
new_name = self._get_file_name(str(index), extension)
119+
old_path = os.path.join(tmp_path, file_name)
120+
new_path = os.path.join(tmp_path, new_name)
121+
self.file_connection.rename_file(old_path, new_path)
122+
123+
def _get_file_name(self, index: str, extension: str) -> str:
124+
return self.transfer_dto.file_name_template.format(
125+
index=index,
126+
extension=extension,
127+
run_id=self.run_dto.id,
128+
run_created_at=self.run_dto.created_at.strftime("%Y_%m_%d_%H_%M_%S"),
67129
)
68130

69-
return writer.run(df=df)
131+
def _get_file_extension(self) -> str:
132+
file_format = self.transfer_dto.file_format.__class__.__name__.lower()
133+
extension_suffix = self._file_format_to_file_suffix[file_format]
134+
135+
compression = getattr(self.transfer_dto.file_format, "compression", "none")
136+
if compression == "none":
137+
return extension_suffix
138+
139+
compression_suffix = self._compression_to_file_suffix[compression]
140+
141+
# https://github.com/apache/parquet-java/blob/fb6f0be0323f5f52715b54b8c6602763d8d0128d/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java#L26-L33
142+
if extension_suffix == "parquet" and compression_suffix == "lz4":
143+
return "lz4hadoop.parquet"
144+
145+
if extension_suffix in COLUMN_FORMATS:
146+
return f"{compression_suffix}.{extension_suffix}"
147+
148+
return f"{extension_suffix}.{compression_suffix}"
70149

71150
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
72151
expressions = []

syncmaster/worker/handlers/file/ftp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class FTPHandler(FileProtocolHandler):
1818
connection_dto: FTPConnectionDTO
1919

2020
def connect(self, spark: SparkSession) -> None:
21-
self.connection = FTP(
21+
self.file_connection = FTP(
2222
host=self.connection_dto.host,
2323
port=self.connection_dto.port,
2424
user=self.connection_dto.user,
2525
password=self.connection_dto.password,
2626
).check()
27-
self.local_connection = SparkLocalFS(
27+
self.local_df_connection = SparkLocalFS(
2828
spark=spark,
2929
).check()

syncmaster/worker/handlers/file/ftps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class FTPSHandler(FileProtocolHandler):
1818
connection_dto: FTPSConnectionDTO
1919

2020
def connect(self, spark: SparkSession) -> None:
21-
self.connection = FTPS(
21+
self.file_connection = FTPS(
2222
host=self.connection_dto.host,
2323
port=self.connection_dto.port,
2424
user=self.connection_dto.user,
2525
password=self.connection_dto.password,
2626
).check()
27-
self.local_connection = SparkLocalFS(
27+
self.local_df_connection = SparkLocalFS(
2828
spark=spark,
2929
).check()

0 commit comments

Comments
 (0)