Skip to content

Commit 41a2a62

Browse files
author
Ilyas Gasanov
committed
Review
1 parent 8435484 commit 41a2a62

File tree

8 files changed

+37
-28
lines changed

8 files changed

+37
-28
lines changed

.env.docker

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ TEST_SFTP_PORT_FOR_CONFTEST=2222
113113
TEST_SFTP_HOST_FOR_WORKER=test-sftp
114114
TEST_SFTP_PORT_FOR_WORKER=2222
115115
TEST_SFTP_USER=syncmaster
116-
TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
116+
TEST_SFTP_PASSWORD=test_only
117117

118118
SPARK_CONF_DIR=/app/tests/spark/hive/conf/
119119
HADOOP_CONF_DIR=/app/tests/spark/hadoop/

.env.local

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export TEST_SFTP_PORT_FOR_CONFTEST=2222
100100
export TEST_SFTP_HOST_FOR_WORKER=test-sftp
101101
export TEST_SFTP_PORT_FOR_WORKER=2222
102102
export TEST_SFTP_USER=syncmaster
103-
export TEST_SFTP_PASSWORD=AesujeifohgoaCu0Boosiet5aimeitho
103+
export TEST_SFTP_PASSWORD=test_only
104104

105105
export SPARK_CONF_DIR=./tests/spark/hive/conf/
106106
export HADOOP_CONF_DIR=./tests/spark/hadoop/

docker-compose.test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ services:
277277
USER_NAME: syncmaster
278278
PASSWORD_ACCESS: true
279279
SUDO_ACCESS: true
280-
USER_PASSWORD: AesujeifohgoaCu0Boosiet5aimeitho
280+
USER_PASSWORD: test_only
281281
profiles: [sftp, all]
282282

283283
volumes:

syncmaster/worker/controller.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
4+
import tempfile
5+
from pathlib import Path
46
from typing import Any
57

68
from syncmaster.db.models import Connection, Run
@@ -103,40 +105,48 @@ def __init__(
103105
target_connection: Connection,
104106
target_auth_data: dict,
105107
):
108+
self.temp_dir = tempfile.TemporaryDirectory(prefix="syncmaster_")
109+
106110
self.run = run
107111
self.source_handler = self.get_handler(
108112
connection_data=source_connection.data,
109113
transfer_params=run.transfer.source_params,
110114
transformations=run.transfer.transformations,
111115
connection_auth_data=source_auth_data,
116+
temp_dir=tempfile.TemporaryDirectory(dir=self.temp_dir.name, prefix="downloaded_"),
112117
)
113118
self.target_handler = self.get_handler(
114119
connection_data=target_connection.data,
115120
transfer_params=run.transfer.target_params,
116121
transformations=run.transfer.transformations,
117122
connection_auth_data=target_auth_data,
123+
temp_dir=tempfile.TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
118124
)
119125

120126
def perform_transfer(self, settings: WorkerAppSettings) -> None:
121-
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
122-
run=self.run,
123-
source=self.source_handler.connection_dto,
124-
target=self.target_handler.connection_dto,
125-
)
127+
try:
128+
spark = settings.worker.CREATE_SPARK_SESSION_FUNCTION(
129+
run=self.run,
130+
source=self.source_handler.connection_dto,
131+
target=self.target_handler.connection_dto,
132+
)
126133

127-
with spark:
128-
self.source_handler.connect(spark)
129-
self.target_handler.connect(spark)
134+
with spark:
135+
self.source_handler.connect(spark)
136+
self.target_handler.connect(spark)
130137

131-
df = self.source_handler.read()
132-
self.target_handler.write(df)
138+
df = self.source_handler.read()
139+
self.target_handler.write(df)
140+
finally:
141+
self.temp_dir.cleanup()
133142

134143
def get_handler(
135144
self,
136145
connection_data: dict[str, Any],
137146
connection_auth_data: dict,
138147
transfer_params: dict[str, Any],
139148
transformations: list[dict],
149+
temp_dir: Path,
140150
) -> Handler:
141151
connection_data.update(connection_auth_data)
142152
connection_data.pop("type")
@@ -150,4 +160,5 @@ def get_handler(
150160
return handler(
151161
connection_dto=connection_dto(**connection_data),
152162
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
163+
temp_dir=temp_dir,
153164
)

syncmaster/worker/handlers/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from __future__ import annotations
55

66
from abc import ABC, abstractmethod
7+
from pathlib import Path
78
from typing import TYPE_CHECKING
89

910
from syncmaster.dto.connections import ConnectionDTO
@@ -19,9 +20,11 @@ def __init__(
1920
self,
2021
connection_dto: ConnectionDTO,
2122
transfer_dto: TransferDTO,
23+
temp_dir: Path,
2224
):
2325
self.connection_dto = connection_dto
2426
self.transfer_dto = transfer_dto
27+
self.temp_dir = temp_dir
2528

2629
@abstractmethod
2730
def connect(self, spark: SparkSession) -> None: ...

syncmaster/worker/handlers/file/sftp.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def connect(self, spark: SparkSession) -> None:
2424
port=self.connection_dto.port,
2525
user=self.connection_dto.user,
2626
password=self.connection_dto.password,
27-
compress=False,
27+
compress=False, # to avoid errors from combining file and SCP-level compression
2828
).check()
2929
self.local_connection = SparkLocalFS(
3030
spark=spark,
@@ -36,16 +36,14 @@ def read(self) -> DataFrame:
3636
downloader = FileDownloader(
3737
connection=self.connection,
3838
source_path=self.transfer_dto.directory_path,
39-
temp_path="/tmp/syncmaster",
40-
local_path="/tmp/syncmaster/sftp",
41-
options={"if_exists": "replace_entire_directory"},
39+
local_path=self.temp_dir.name,
4240
)
4341
downloader.run()
4442

4543
reader = FileDFReader(
4644
connection=self.local_connection,
4745
format=self.transfer_dto.file_format,
48-
source_path="/tmp/syncmaster/sftp",
46+
source_path=self.temp_dir.name,
4947
df_schema=StructType.fromJson(self.transfer_dto.df_schema) if self.transfer_dto.df_schema else None,
5048
options=self.transfer_dto.options,
5149
)
@@ -65,16 +63,14 @@ def write(self, df: DataFrame) -> None:
6563
writer = FileDFWriter(
6664
connection=self.local_connection,
6765
format=self.transfer_dto.file_format,
68-
target_path="/tmp/syncmaster/sftp",
66+
target_path=self.temp_dir.name,
6967
options=self.transfer_dto.options,
7068
)
7169
writer.run(df=df)
7270

7371
uploader = FileUploader(
7472
connection=self.connection,
75-
local_path="/tmp/syncmaster/sftp",
76-
temp_path="/config/target", # SFTP host
73+
local_path=self.temp_dir.name,
7774
target_path=self.transfer_dto.directory_path,
78-
options={"if_exists": "replace_entire_directory"},
7975
)
8076
uploader.run()

tests/test_integration/test_run_transfer/connection_fixtures/sftp_fixtures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def sftp_file_connection(sftp_for_conftest):
8484
port=sftp_for_conftest.port,
8585
user=sftp_for_conftest.user,
8686
password=sftp_for_conftest.password,
87-
compress=False,
87+
compress=False, # to avoid errors from combining file and SCP-level compression
8888
)
8989

9090

tests/test_integration/test_run_transfer/test_sftp.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
import secrets
3+
from pathlib import Path
34

45
import pytest
56
import pytest_asyncio
@@ -246,6 +247,7 @@ async def test_run_transfer_postgres_to_sftp(
246247
postgres_to_sftp: Transfer,
247248
target_file_format,
248249
file_format_flavor: str,
250+
tmp_path: Path,
249251
):
250252
format_name, format = target_file_format
251253

@@ -279,18 +281,15 @@ async def test_run_transfer_postgres_to_sftp(
279281
downloader = FileDownloader(
280282
connection=sftp_file_connection,
281283
source_path=f"/config/target/{format_name}/{file_format_flavor}",
282-
temp_path="/tmp/syncmaster",
283-
local_path="/tmp/syncmaster/sftp",
284-
options={"if_exists": "replace_entire_directory"},
284+
local_path=tmp_path,
285285
)
286286
downloader.run()
287287

288288
reader = FileDFReader(
289289
connection=sftp_file_df_connection,
290290
format=format,
291-
source_path="/tmp/syncmaster/sftp",
291+
source_path=tmp_path,
292292
df_schema=init_df.schema,
293-
options={},
294293
)
295294
df = reader.run()
296295

0 commit comments

Comments
 (0)