File tree Expand file tree Collapse file tree 6 files changed +18
-11
lines changed
syncmaster/worker/handlers/file Expand file tree Collapse file tree 6 files changed +18
-11
lines changed Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .connection import FTP , SparkLocalFS
9-
108from syncmaster .dto .connections import FTPConnectionDTO
119from syncmaster .worker .handlers .file .local_df import LocalDFFileHandler
1210
@@ -18,12 +16,15 @@ class FTPHandler(LocalDFFileHandler):
1816 connection_dto : FTPConnectionDTO
1917
2018 def connect (self , spark : SparkSession ) -> None :
19+ from onetl .connection import FTP , SparkLocalFS
20+
2121 self .file_connection = FTP (
2222 host = self .connection_dto .host ,
2323 port = self .connection_dto .port ,
2424 user = self .connection_dto .user ,
2525 password = self .connection_dto .password ,
2626 ).check ()
27+
2728 self .local_df_connection = SparkLocalFS (
2829 spark = spark ,
2930 ).check ()
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .connection import FTPS , SparkLocalFS
9-
108from syncmaster .dto .connections import FTPSConnectionDTO
119from syncmaster .worker .handlers .file .local_df import LocalDFFileHandler
1210
@@ -18,12 +16,15 @@ class FTPSHandler(LocalDFFileHandler):
1816 connection_dto : FTPSConnectionDTO
1917
2018 def connect (self , spark : SparkSession ) -> None :
19+ from onetl .connection import FTPS , SparkLocalFS
20+
2121 self .file_connection = FTPS (
2222 host = self .connection_dto .host ,
2323 port = self .connection_dto .port ,
2424 user = self .connection_dto .user ,
2525 password = self .connection_dto .password ,
2626 ).check ()
27+
2728 self .local_df_connection = SparkLocalFS (
2829 spark = spark ,
2930 ).check ()
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .connection import HDFS , SparkHDFS
9-
108from syncmaster .dto .connections import HDFSConnectionDTO
119from syncmaster .worker .handlers .file .remote_df import RemoteDFFileHandler
1210
@@ -18,10 +16,13 @@ class HDFSHandler(RemoteDFFileHandler):
1816 connection_dto : HDFSConnectionDTO
1917
2018 def connect (self , spark : SparkSession ):
19+ from onetl .connection import HDFS , SparkHDFS
20+
2121 self .df_connection = SparkHDFS (
2222 cluster = self .connection_dto .cluster ,
2323 spark = spark ,
2424 ).check ()
25+
2526 self .file_connection = HDFS (
2627 cluster = self .connection_dto .cluster ,
2728 ).check ()
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .connection import S3 , SparkS3
98from onetl .file import FileDFReader
109
1110from syncmaster .dto .connections import S3ConnectionDTO
@@ -19,6 +18,8 @@ class S3Handler(RemoteDFFileHandler):
1918 connection_dto : S3ConnectionDTO
2019
2120 def connect (self , spark : SparkSession ):
21+ from onetl .connection import S3 , SparkS3
22+
2223 self .df_connection = SparkS3 (
2324 host = self .connection_dto .host ,
2425 port = self .connection_dto .port ,
@@ -30,6 +31,7 @@ def connect(self, spark: SparkSession):
3031 extra = self .connection_dto .additional_params ,
3132 spark = spark ,
3233 ).check ()
34+
3335 self .file_connection = S3 (
3436 host = self .connection_dto .host ,
3537 port = self .connection_dto .port ,
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .connection import Samba , SparkLocalFS
9-
108from syncmaster .dto .connections import SambaConnectionDTO
119from syncmaster .worker .handlers .file .local_df import LocalDFFileHandler
1210
@@ -18,6 +16,8 @@ class SambaHandler(LocalDFFileHandler):
1816 connection_dto : SambaConnectionDTO
1917
2018 def connect (self , spark : SparkSession ) -> None :
19+ from onetl .connection import Samba , SparkLocalFS
20+
2121 self .file_connection = Samba (
2222 host = self .connection_dto .host ,
2323 port = self .connection_dto .port ,
@@ -28,6 +28,7 @@ def connect(self, spark: SparkSession) -> None:
2828 password = self .connection_dto .password ,
2929 auth_type = self .connection_dto .auth_type ,
3030 ).check ()
31+
3132 self .local_df_connection = SparkLocalFS (
3233 spark = spark ,
3334 ).check ()
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .connection import SFTP , SparkLocalFS
9-
108from syncmaster .dto .connections import SFTPConnectionDTO
119from syncmaster .worker .handlers .file .local_df import LocalDFFileHandler
1210
@@ -18,13 +16,16 @@ class SFTPHandler(LocalDFFileHandler):
1816 connection_dto : SFTPConnectionDTO
1917
2018 def connect (self , spark : SparkSession ) -> None :
19+ from onetl .connection import SFTP , SparkLocalFS
20+
2121 self .file_connection = SFTP (
2222 host = self .connection_dto .host ,
2323 port = self .connection_dto .port ,
2424 user = self .connection_dto .user ,
2525 password = self .connection_dto .password ,
2626 compress = False , # to avoid errors from combining file and SCP-level compression
2727 ).check ()
28+
2829 self .local_df_connection = SparkLocalFS (
2930 spark = spark ,
3031 ).check ()
You can’t perform that action at this time.
0 commit comments