Skip to content

Commit 9c9ac03

Browse files
author
Ilyas Gasanov
committed
[DOP-22146] Set the names of saved files
1 parent 98f006c commit 9c9ac03

File tree

24 files changed

+264
-42
lines changed

24 files changed

+264
-42
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Use the `file_name_template` field to specify the names of saved files

syncmaster/dto/runs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from dataclasses import dataclass
4+
from datetime import datetime
5+
6+
7+
@dataclass
8+
class RunDTO:
9+
id: str
10+
created_at: datetime

syncmaster/dto/transfers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class FileTransferDTO(TransferDTO):
2323
directory_path: str
2424
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
2525
options: dict
26+
file_name_template: str | None = None
2627
df_schema: dict | None = None
2728
transformations: list[dict] | None = None
2829

syncmaster/schemas/v1/transfers/file/base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class CreateFileTransferTarget(BaseModel):
6363
discriminator="type",
6464
)
6565
file_name_template: str = Field(
66-
default="{run_created_at}_{index}.{extension}",
66+
default="{run_created_at}-{index}.{extension}",
6767
description="Template for file naming with required placeholders 'index' and 'extension'",
6868
)
6969
options: dict[str, Any] = Field(default_factory=dict)
@@ -81,6 +81,9 @@ def _directory_path_is_valid_path(cls, value):
8181
@field_validator("file_name_template")
8282
@classmethod
8383
def validate_file_name_template(cls, value):
84+
if ":" in value:
85+
raise ValueError("The ':' symbol is not allowed by Spark in filenames")
86+
8487
required_keys = {"index", "extension"}
8588
placeholders = {key for key in required_keys if f"{{{key}}}" in value}
8689

syncmaster/worker/controller.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
SFTPConnectionDTO,
2121
WebDAVConnectionDTO,
2222
)
23+
from syncmaster.dto.runs import RunDTO
2324
from syncmaster.dto.transfers import (
2425
ClickhouseTransferDTO,
2526
FTPSTransferDTO,
@@ -60,66 +61,79 @@
6061
HiveHandler,
6162
HiveConnectionDTO,
6263
HiveTransferDTO,
64+
RunDTO,
6365
),
6466
"oracle": (
6567
OracleHandler,
6668
OracleConnectionDTO,
6769
OracleTransferDTO,
70+
RunDTO,
6871
),
6972
"clickhouse": (
7073
ClickhouseHandler,
7174
ClickhouseConnectionDTO,
7275
ClickhouseTransferDTO,
76+
RunDTO,
7377
),
7478
"mssql": (
7579
MSSQLHandler,
7680
MSSQLConnectionDTO,
7781
MSSQLTransferDTO,
82+
RunDTO,
7883
),
7984
"mysql": (
8085
MySQLHandler,
8186
MySQLConnectionDTO,
8287
MySQLTransferDTO,
88+
RunDTO,
8389
),
8490
"postgres": (
8591
PostgresHandler,
8692
PostgresConnectionDTO,
8793
PostgresTransferDTO,
94+
RunDTO,
8895
),
8996
"s3": (
9097
S3Handler,
9198
S3ConnectionDTO,
9299
S3TransferDTO,
100+
RunDTO,
93101
),
94102
"hdfs": (
95103
HDFSHandler,
96104
HDFSConnectionDTO,
97105
HDFSTransferDTO,
106+
RunDTO,
98107
),
99108
"sftp": (
100109
SFTPHandler,
101110
SFTPConnectionDTO,
102111
SFTPTransferDTO,
112+
RunDTO,
103113
),
104114
"ftp": (
105115
FTPHandler,
106116
FTPConnectionDTO,
107117
FTPTransferDTO,
118+
RunDTO,
108119
),
109120
"ftps": (
110121
FTPSHandler,
111122
FTPSConnectionDTO,
112123
FTPSTransferDTO,
124+
RunDTO,
113125
),
114126
"samba": (
115127
SambaHandler,
116128
SambaConnectionDTO,
117129
SambaTransferDTO,
130+
RunDTO,
118131
),
119132
"webdav": (
120133
WebDAVHandler,
121134
WebDAVConnectionDTO,
122135
WebDAVTransferDTO,
136+
RunDTO,
123137
),
124138
}
125139

@@ -141,13 +155,15 @@ def __init__(
141155
self.run = run
142156
self.source_handler = self.get_handler(
143157
connection_data=source_connection.data,
158+
run_data={"id": run.id, "created_at": run.created_at},
144159
transfer_params=run.transfer.source_params,
145160
transformations=run.transfer.transformations,
146161
connection_auth_data=source_auth_data,
147162
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
148163
)
149164
self.target_handler = self.get_handler(
150165
connection_data=target_connection.data,
166+
run_data={"id": run.id, "created_at": run.created_at},
151167
transfer_params=run.transfer.target_params,
152168
transformations=run.transfer.transformations,
153169
connection_auth_data=target_auth_data,
@@ -175,6 +191,7 @@ def get_handler(
175191
self,
176192
connection_data: dict[str, Any],
177193
connection_auth_data: dict,
194+
run_data: dict[str, Any],
178195
transfer_params: dict[str, Any],
179196
transformations: list[dict],
180197
temp_dir: TemporaryDirectory,
@@ -186,10 +203,11 @@ def get_handler(
186203
if connection_handler_proxy.get(handler_type, None) is None:
187204
raise ConnectionTypeNotRecognizedError
188205

189-
handler, connection_dto, transfer_dto = connection_handler_proxy[handler_type]
206+
handler, connection_dto, transfer_dto, run_dto = connection_handler_proxy[handler_type]
190207

191208
return handler(
192209
connection_dto=connection_dto(**connection_data),
193210
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
211+
run_dto=run_dto(**run_data),
194212
temp_dir=temp_dir,
195213
)

syncmaster/worker/handlers/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import TYPE_CHECKING
99

1010
from syncmaster.dto.connections import ConnectionDTO
11+
from syncmaster.dto.runs import RunDTO
1112
from syncmaster.dto.transfers import TransferDTO
1213

1314
if TYPE_CHECKING:
@@ -20,10 +21,12 @@ def __init__(
2021
self,
2122
connection_dto: ConnectionDTO,
2223
transfer_dto: TransferDTO,
24+
run_dto: RunDTO,
2325
temp_dir: TemporaryDirectory,
2426
):
2527
self.connection_dto = connection_dto
2628
self.transfer_dto = transfer_dto
29+
self.run_dto = run_dto
2730
self.temp_dir = temp_dir
2831

2932
@abstractmethod

syncmaster/worker/handlers/file/base.py

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33

44
from __future__ import annotations
55

6+
import os
7+
import shutil
68
from typing import TYPE_CHECKING
79

810
from onetl.base.base_file_df_connection import BaseFileDFConnection
9-
from onetl.file import FileDFReader, FileDFWriter
11+
from onetl.file import FileDFReader, FileDFWriter, FileMover
1012

1113
from syncmaster.dto.connections import ConnectionDTO
1214
from syncmaster.dto.transfers import FileTransferDTO
@@ -17,7 +19,7 @@
1719

1820

1921
class FileHandler(Handler):
20-
connection: BaseFileDFConnection
22+
df_connection: BaseFileDFConnection
2123
connection_dto: ConnectionDTO
2224
transfer_dto: FileTransferDTO
2325
_operators = {
@@ -40,7 +42,7 @@ def read(self) -> DataFrame:
4042
from pyspark.sql.types import StructType
4143

4244
reader = FileDFReader(
43-
connection=self.connection,
45+
connection=self.df_connection,
4446
format=self.transfer_dto.file_format,
4547
source_path=self.transfer_dto.directory_path,
4648
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
@@ -59,14 +61,65 @@ def read(self) -> DataFrame:
5961
return df
6062

6163
def write(self, df: DataFrame) -> None:
62-
writer = FileDFWriter(
63-
connection=self.connection,
64-
format=self.transfer_dto.file_format,
65-
target_path=self.transfer_dto.directory_path,
66-
options=self.transfer_dto.options,
64+
tmp_path = os.path.join(self.transfer_dto.directory_path, ".tmp", str(self.run_dto.id))
65+
try:
66+
writer = FileDFWriter(
67+
connection=self.df_connection,
68+
format=self.transfer_dto.file_format,
69+
target_path=tmp_path,
70+
options=self.transfer_dto.options,
71+
)
72+
writer.run(df=df)
73+
74+
self._rename_files(tmp_path)
75+
76+
mover = FileMover(
77+
connection=self.file_connection,
78+
source_path=tmp_path,
79+
target_path=self.transfer_dto.directory_path,
80+
)
81+
mover.run()
82+
finally:
83+
shutil.rmtree(tmp_path, ignore_errors=True)
84+
85+
def _rename_files(self, tmp_path: str) -> None:
86+
files = self.file_connection.list_dir(tmp_path)
87+
88+
for index, file_name in enumerate(files):
89+
extension = self._get_file_extension(str(file_name))
90+
new_name = self._get_file_name(str(index), extension)
91+
old_path = os.path.join(tmp_path, file_name)
92+
new_path = os.path.join(tmp_path, new_name)
93+
self.file_connection.rename_file(old_path, new_path)
94+
95+
def _get_file_name(self, index: str, extension: str) -> str:
96+
return self.transfer_dto.file_name_template.format(
97+
index=index,
98+
extension=extension,
99+
run_id=self.run_dto.id,
100+
run_created_at=self.run_dto.created_at.strftime("%Y_%m_%d_%H_%M_%S"),
67101
)
68102

69-
return writer.run(df=df)
103+
def _get_file_extension(self, file_name: str) -> str:
104+
extension = self.transfer_dto.file_format.name
105+
parts = file_name.split(".")
106+
107+
if extension == "xml": # spark-xml does not write any extension to files
108+
if len(parts) <= 1:
109+
return extension
110+
111+
compression = parts[-1]
112+
113+
else:
114+
if len(parts) <= 2:
115+
return extension
116+
117+
compression = parts[-1] if parts[-1] != extension else parts[-2]
118+
119+
if extension in ("parquet", "orc"):
120+
return f"{compression}.{extension}"
121+
122+
return f"{extension}.{compression}"
70123

71124
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
72125
expressions = []

syncmaster/worker/handlers/file/ftp.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class FTPHandler(FileProtocolHandler):
1818
connection_dto: FTPConnectionDTO
1919

2020
def connect(self, spark: SparkSession) -> None:
21-
self.connection = FTP(
21+
self.file_connection = FTP(
2222
host=self.connection_dto.host,
2323
port=self.connection_dto.port,
2424
user=self.connection_dto.user,
2525
password=self.connection_dto.password,
2626
).check()
27-
self.local_connection = SparkLocalFS(
27+
self.local_df_connection = SparkLocalFS(
2828
spark=spark,
2929
).check()

syncmaster/worker/handlers/file/ftps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ class FTPSHandler(FileProtocolHandler):
1818
connection_dto: FTPSConnectionDTO
1919

2020
def connect(self, spark: SparkSession) -> None:
21-
self.connection = FTPS(
21+
self.file_connection = FTPS(
2222
host=self.connection_dto.host,
2323
port=self.connection_dto.port,
2424
user=self.connection_dto.user,
2525
password=self.connection_dto.password,
2626
).check()
27-
self.local_connection = SparkLocalFS(
27+
self.local_df_connection = SparkLocalFS(
2828
spark=spark,
2929
).check()

syncmaster/worker/handlers/file/hdfs.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from typing import TYPE_CHECKING
77

8-
from onetl.connection import SparkHDFS
8+
from onetl.connection import HDFS, SparkHDFS
99

1010
from syncmaster.dto.connections import HDFSConnectionDTO
1111
from syncmaster.worker.handlers.file.base import FileHandler
@@ -18,7 +18,12 @@ class HDFSHandler(FileHandler):
1818
connection_dto: HDFSConnectionDTO
1919

2020
def connect(self, spark: SparkSession):
21-
self.connection = SparkHDFS(
21+
self.df_connection = SparkHDFS(
2222
cluster=self.connection_dto.cluster,
2323
spark=spark,
2424
).check()
25+
self.file_connection = HDFS(
26+
cluster=self.connection_dto.cluster,
27+
user=self.connection_dto.user,
28+
password=self.connection_dto.password,
29+
).check()

0 commit comments

Comments
 (0)