33
44from __future__ import annotations
55
6+ import tempfile
67from typing import TYPE_CHECKING
78
89from onetl .connection import SFTP , SparkLocalFS
@@ -24,7 +25,7 @@ def connect(self, spark: SparkSession) -> None:
2425 port = self .connection_dto .port ,
2526 user = self .connection_dto .user ,
2627 password = self .connection_dto .password ,
27- compress = False ,
28+ compress = False , # to avoid errors from combining file and SCP-level compression
2829 ).check ()
2930 self .local_connection = SparkLocalFS (
3031 spark = spark ,
@@ -33,23 +34,23 @@ def connect(self, spark: SparkSession) -> None:
3334 def read (self ) -> DataFrame :
3435 from pyspark .sql .types import StructType
3536
36- downloader = FileDownloader (
37- connection = self . connection ,
38- source_path = self .transfer_dto . directory_path ,
39- temp_path = "/tmp/syncmaster" ,
40- local_path = "/tmp/syncmaster/sftp" ,
41- options = { "if_exists" : "replace_entire_directory" },
42- )
43- downloader . run ()
44-
45- reader = FileDFReader (
46- connection = self .local_connection ,
47- format = self . transfer_dto . file_format ,
48- source_path = "/tmp/syncmaster/sftp" ,
49- df_schema = StructType . fromJson ( self . transfer_dto . df_schema ) if self .transfer_dto .df_schema else None ,
50- options = self . transfer_dto . options ,
51- )
52- df = reader . run ()
37+ with tempfile . TemporaryDirectory ( prefix = "syncmaster_sftp_" ) as temp_dir :
38+ downloader = FileDownloader (
39+ connection = self .connection ,
40+ source_path = self . transfer_dto . directory_path ,
41+ local_path = temp_dir ,
42+ )
43+ downloader . run ( )
44+
45+ reader = FileDFReader (
46+ connection = self . local_connection ,
47+ format = self .transfer_dto . file_format ,
48+ source_path = temp_dir ,
49+ df_schema = StructType . fromJson ( self . transfer_dto . df_schema ) if self . transfer_dto . df_schema else None ,
50+ options = self .transfer_dto .options ,
51+ )
52+ df = reader . run ( )
53+ df . cache (). count ()
5354
5455 rows_filter_expression = self ._get_rows_filter_expression ()
5556 if rows_filter_expression :
@@ -62,19 +63,18 @@ def read(self) -> DataFrame:
6263 return df
6364
6465 def write (self , df : DataFrame ) -> None :
65- writer = FileDFWriter (
66- connection = self .local_connection ,
67- format = self .transfer_dto .file_format ,
68- target_path = "/tmp/syncmaster/sftp" ,
69- options = self .transfer_dto .options ,
70- )
71- writer .run (df = df )
72-
73- uploader = FileUploader (
74- connection = self .connection ,
75- local_path = "/tmp/syncmaster/sftp" ,
76- temp_path = "/config/target" , # SFTP host
77- target_path = self .transfer_dto .directory_path ,
78- options = {"if_exists" : "replace_entire_directory" },
79- )
80- uploader .run ()
66+ with tempfile .TemporaryDirectory (prefix = "syncmaster_sftp_" ) as temp_dir :
67+ writer = FileDFWriter (
68+ connection = self .local_connection ,
69+ format = self .transfer_dto .file_format ,
70+ target_path = temp_dir ,
71+ options = self .transfer_dto .options ,
72+ )
73+ writer .run (df = df )
74+
75+ uploader = FileUploader (
76+ connection = self .connection ,
77+ local_path = temp_dir ,
78+ target_path = self .transfer_dto .directory_path ,
79+ )
80+ uploader .run ()
0 commit comments