Skip to content

Commit 14414b6

Browse files
author
Ilyas Gasanov
committed
Review
1 parent 8435484 commit 14414b6

File tree

8 files changed

+31
-20
lines changed

8 files changed

+31
-20
lines changed

.env.docker

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222
113113
TEST_SFTP_HOST_FOR_WORKER=test-sftp
114114
TEST_SFTP_PORT_FOR_WORKER=2222
115115
TEST_SFTP_USER=syncmaster
116-
TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
116+
TEST_SFTP_PASSWORD=test_only
117117

118118
SPARK_CONF_DIR=/app/tests/spark/hive/conf/
119119
HADOOP_CONF_DIR=/app/tests/spark/hadoop/

.env.local

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222
100100
export TEST_SFTP_HOST_FOR_WORKER=test-sftp
101101
export TEST_SFTP_PORT_FOR_WORKER=2222
102102
export TEST_SFTP_USER=syncmaster
103-
export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
103+
export TEST_SFTP_PASSWORD=test_only
104104

105105
export SPARK_CONF_DIR=./tests/spark/hive/conf/
106106
export HADOOP_CONF_DIR=./tests/spark/hadoop/

docker-compose.test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ services:
277277
USER_NAME: syncmaster
278278
PASSWORD_ACCESS: true
279279
SUDO_ACCESS: true
280-
USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho
280+
USER_PASSWORD: test_only
281281
profiles: [sftp, all]
282282

283283
volumes:

syncmaster/dto/transfers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# SPDX-License-Identifier: Apache-2.0
33
import json
44
from dataclasses import dataclass
5+
from pathlib import Path
56
from typing import ClassVar
67

78
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
@@ -98,3 +99,4 @@ class HDFSTransferDTO(FileTransferDTO):
9899
@dataclass
99100
class SFTPTransferDTO(FileTransferDTO):
100101
type: ClassVar[str] = "sftp"
102+
temp_worker_directory_path: Path | None = None

syncmaster/worker/controller.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
4+
import tempfile
45
from typing import Any
56

67
from syncmaster.db.models import Connection, Run
@@ -128,8 +129,21 @@ def perform_transfer(self, settings: WorkerAppSettings) -> None:
128129
self.source_handler.connect(spark)
129130
self.target_handler.connect(spark)
130131

131-
df = self.source_handler.read()
132-
self.target_handler.write(df)
132+
source_needs_temp_dir = self.source_handler.transfer_dto.type == "sftp"
133+
target_needs_temp_dir = self.target_handler.transfer_dto.type == "sftp"
134+
135+
if source_needs_temp_dir or target_needs_temp_dir:
136+
with tempfile.TemporaryDirectory(prefix="syncmaster_") as temp_dir:
137+
if source_needs_temp_dir:
138+
self.source_handler.transfer_dto.temp_worker_directory_path = temp_dir
139+
if target_needs_temp_dir:
140+
self.target_handler.transfer_dto.temp_worker_directory_path = temp_dir
141+
142+
df = self.source_handler.read()
143+
self.target_handler.write(df)
144+
else:
145+
df = self.source_handler.read()
146+
self.target_handler.write(df)
133147

134148
def get_handler(
135149
self,

syncmaster/worker/handlers/file/sftp.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def connect(self, spark: SparkSession) -> None:
2424
port=self.connection_dto.port,
2525
user=self.connection_dto.user,
2626
password=self.connection_dto.password,
27-
compress=False,
27+
compress=False, # to avoid errors from combining file and SCP-level compression
2828
).check()
2929
self.local_connection = SparkLocalFS(
3030
spark=spark,
@@ -36,16 +36,14 @@ def read(self) -> DataFrame:
3636
downloader = FileDownloader(
3737
connection=self.connection,
3838
source_path=self.transfer_dto.directory_path,
39-
temp_path="/tmp/syncmaster",
40-
local_path="/tmp/syncmaster/sftp",
41-
options={"if_exists": "replace_entire_directory"},
39+
local_path=self.transfer_dto.temp_worker_directory_path,
4240
)
4341
downloader.run()
4442

4543
reader = FileDFReader(
4644
connection=self.local_connection,
4745
format=self.transfer_dto.file_format,
48-
source_path="/tmp/syncmaster/sftp",
46+
source_path=self.transfer_dto.temp_worker_directory_path,
4947
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
5048
options=self.transfer_dto.options,
5149
)
@@ -65,16 +63,14 @@ def write(self, df: DataFrame) -> None:
6563
writer = FileDFWriter(
6664
connection=self.local_connection,
6765
format=self.transfer_dto.file_format,
68-
target_path="/tmp/syncmaster/sftp",
66+
target_path=self.transfer_dto.temp_worker_directory_path,
6967
options=self.transfer_dto.options,
7068
)
7169
writer.run(df=df)
7270

7371
uploader = FileUploader(
7472
connection=self.connection,
75-
local_path="/tmp/syncmaster/sftp",
76-
temp_path="/config/target", # SFTP host
73+
local_path=self.transfer_dto.temp_worker_directory_path,
7774
target_path=self.transfer_dto.directory_path,
78-
options={"if_exists": "replace_entire_directory"},
7975
)
8076
uploader.run()

tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest):
8484
port=sftp_for_conftest.port,
8585
user=sftp_for_conftest.user,
8686
password=sftp_for_conftest.password,
87-
compress=False,
87+
compress=False, # to avoid errors from combining file and SCP-level compression
8888
)
8989

9090

tests/test_integration/test_run_transfer/test_sftp.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import secrets
3+
from pathlib import Path
34

45
import pytest
56
import pytest_asyncio
@@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp(
246247
postgres_to_sftp: Transfer,
247248
target_file_format,
248249
file_format_flavor: str,
250+
tmp_path: Path,
249251
):
250252
format_name, format = target_file_format
251253

@@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp(
279281
downloader = FileDownloader(
280282
connection=sftp_file_connection,
281283
source_path=f"/config/target/{format_name}/{file_format_flavor}",
282-
temp_path="/tmp/syncmaster",
283-
local_path="/tmp/syncmaster/sftp",
284-
options={"if_exists": "replace_entire_directory"},
284+
local_path=tmp_path,
285285
)
286286
downloader.run()
287287

288288
reader = FileDFReader(
289289
connection=sftp_file_df_connection,
290290
format=format,
291-
source_path="/tmp/syncmaster/sftp",
291+
source_path=tmp_path,
292292
df_schema=init_df.schema,
293-
options={},
294293
)
295294
df = reader.run()
296295

0 commit comments

Comments
 (0)