File tree Expand file tree Collapse file tree 17 files changed +19
-136
lines changed
syncmaster/worker/handlers Expand file tree Collapse file tree 17 files changed +19
-136
lines changed Original file line number Diff line number Diff line change 88
99from onetl .base import BaseDBConnection
1010from onetl .db import DBReader , DBWriter
11+ from onetl .hooks import slot , support_hooks
1112
1213from syncmaster .dto .transfers import DBTransferDTO
1314from syncmaster .worker .handlers .base import Handler
1617 from pyspark .sql .dataframe import DataFrame
1718
1819
20+ @support_hooks
1921class DBHandler (Handler ):
2022 connection : BaseDBConnection
2123 transfer_dto : DBTransferDTO
@@ -35,6 +37,7 @@ class DBHandler(Handler):
3537 "not_ilike" : "NOT ILIKE" ,
3638 }
3739
40+ @slot
3841 def read (self ) -> DataFrame :
3942 reader_params = {}
4043 if self .transfer_dto .strategy .type == "incremental" :
@@ -58,6 +61,7 @@ def read(self) -> DataFrame:
5861 )
5962 return reader .run ()
6063
64+ @slot
6165 def write (self , df : DataFrame ) -> None :
6266 if self .transfer_dto .strategy .type == "incremental" and self .hwm and self .hwm .value :
6367 self .transfer_dto .options ["if_exists" ] = "append"
Original file line number Diff line number Diff line change 77
88from onetl .connection import Clickhouse
99from onetl .db import DBWriter
10- from onetl .hooks import slot , support_hooks
10+ from onetl .hooks import slot
1111
1212from syncmaster .dto .connections import ClickhouseConnectionDTO
1313from syncmaster .dto .transfers import ClickhouseTransferDTO
1818 from pyspark .sql .dataframe import DataFrame
1919
2020
21- @support_hooks
2221class ClickhouseHandler (DBHandler ):
2322 connection : Clickhouse
2423 connection_dto : ClickhouseConnectionDTO
@@ -42,10 +41,6 @@ def connect(self, spark: SparkSession):
4241 spark = spark ,
4342 ).check ()
4443
45- @slot
46- def read (self ) -> DataFrame :
47- return super ().read ()
48-
4944 @slot
5045 def write (self , df : DataFrame ) -> None :
5146 normalized_df = self ._normalize_column_names (df )
Original file line number Diff line number Diff line change @@ -38,10 +38,6 @@ def read(self) -> DataFrame:
3838 self .connection .spark .catalog .refreshTable (self .transfer_dto .table_name )
3939 return super ().read ()
4040
41- @slot
42- def write (self , df : DataFrame ) -> None :
43- return super ().write (df )
44-
4541 def _normalize_column_names (self , df : DataFrame ) -> DataFrame :
4642 for column_name in df .columns :
4743 df = df .withColumnRenamed (column_name , column_name .lower ())
Original file line number Diff line number Diff line change 66from typing import TYPE_CHECKING
77
88from onetl .connection import Iceberg
9- from onetl .hooks import slot , support_hooks
109
1110from syncmaster .dto .connections import IcebergRESTCatalogS3ConnectionDTO
1211from syncmaster .dto .transfers import IcebergRESTCatalogS3TransferDTO
1716 from pyspark .sql .dataframe import DataFrame
1817
1918
20- @support_hooks
2119class IcebergRESTCatalogS3Handler (DBHandler ):
2220 connection : Iceberg
2321 connection_dto : IcebergRESTCatalogS3ConnectionDTO
@@ -51,14 +49,6 @@ def connect(self, spark: SparkSession):
5149 ),
5250 ).check ()
5351
54- @slot
55- def read (self ) -> DataFrame :
56- return super ().read ()
57-
58- @slot
59- def write (self , df : DataFrame ) -> None :
60- return super ().write (df )
61-
6252 def _normalize_column_names (self , df : DataFrame ) -> DataFrame :
6353 for column_name in df .columns :
6454 df = df .withColumnRenamed (column_name , column_name .lower ())
Original file line number Diff line number Diff line change 66from typing import TYPE_CHECKING
77
88from onetl .connection import MSSQL
9- from onetl .hooks import slot , support_hooks
109
1110from syncmaster .dto .connections import MSSQLConnectionDTO
1211from syncmaster .dto .transfers import MSSQLTransferDTO
1716 from pyspark .sql .dataframe import DataFrame
1817
1918
20- @support_hooks
2119class MSSQLHandler (DBHandler ):
2220 connection : MSSQL
2321 connection_dto : MSSQLConnectionDTO
@@ -40,14 +38,6 @@ def connect(self, spark: SparkSession):
4038 spark = spark ,
4139 ).check ()
4240
43- @slot
44- def read (self ) -> DataFrame :
45- return super ().read ()
46-
47- @slot
48- def write (self , df : DataFrame ) -> None :
49- return super ().write (df )
50-
5141 def _normalize_column_names (self , df : DataFrame ) -> DataFrame :
5242 for column_name in df .columns :
5343 df = df .withColumnRenamed (column_name , column_name .lower ())
Original file line number Diff line number Diff line change 66from typing import TYPE_CHECKING
77
88from onetl .connection import MySQL
9- from onetl .hooks import slot , support_hooks
109
1110from syncmaster .dto .connections import MySQLConnectionDTO
1211from syncmaster .dto .transfers import MySQLTransferDTO
1716 from pyspark .sql .dataframe import DataFrame
1817
1918
20- @support_hooks
2119class MySQLHandler (DBHandler ):
2220 connection : MySQL
2321 connection_dto : MySQLConnectionDTO
@@ -37,14 +35,6 @@ def connect(self, spark: SparkSession):
3735 spark = spark ,
3836 ).check ()
3937
40- @slot
41- def read (self ) -> DataFrame :
42- return super ().read ()
43-
44- @slot
45- def write (self , df : DataFrame ) -> None :
46- return super ().write (df )
47-
4838 def _normalize_column_names (self , df : DataFrame ) -> DataFrame :
4939 for column_name in df .columns :
5040 df = df .withColumnRenamed (column_name , column_name .lower ())
Original file line number Diff line number Diff line change 66from typing import TYPE_CHECKING
77
88from onetl .connection import Oracle
9- from onetl .hooks import slot , support_hooks
109
1110from syncmaster .dto .connections import OracleConnectionDTO
1211from syncmaster .dto .transfers import OracleTransferDTO
1716 from pyspark .sql .dataframe import DataFrame
1817
1918
20- @support_hooks
2119class OracleHandler (DBHandler ):
2220 connection : Oracle
2321 connection_dto : OracleConnectionDTO
@@ -39,14 +37,6 @@ def connect(self, spark: SparkSession):
3937 spark = spark ,
4038 ).check ()
4139
42- @slot
43- def read (self ) -> DataFrame :
44- return super ().read ()
45-
46- @slot
47- def write (self , df : DataFrame ) -> None :
48- return super ().write (df )
49-
5040 def _normalize_column_names (self , df : DataFrame ) -> DataFrame :
5141 for column_name in df .columns :
5242 df = df .withColumnRenamed (column_name , column_name .upper ())
Original file line number Diff line number Diff line change 66from typing import TYPE_CHECKING
77
88from onetl .connection import Postgres
9- from onetl .hooks import slot , support_hooks
109
1110from syncmaster .dto .connections import PostgresConnectionDTO
1211from syncmaster .dto .transfers import PostgresTransferDTO
1716 from pyspark .sql .dataframe import DataFrame
1817
1918
20- @support_hooks
2119class PostgresHandler (DBHandler ):
2220 connection : Postgres
2321 connection_dto : PostgresConnectionDTO
@@ -38,14 +36,6 @@ def connect(self, spark: SparkSession):
3836 spark = spark ,
3937 ).check ()
4038
41- @slot
42- def read (self ) -> DataFrame :
43- return super ().read ()
44-
45- @slot
46- def write (self , df : DataFrame ) -> None :
47- return super ().write (df )
48-
4939 def _normalize_column_names (self , df : DataFrame ) -> DataFrame :
5040 for column_name in df .columns :
5141 df = df .withColumnRenamed (column_name , column_name .lower ())
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .hooks import slot , support_hooks
9-
108from syncmaster .dto .connections import FTPConnectionDTO
119from syncmaster .worker .handlers .file .local_df import LocalDFFileHandler
1210
1311if TYPE_CHECKING :
14- from pyspark .sql import DataFrame , SparkSession
12+ from pyspark .sql import SparkSession
1513
1614
17- @support_hooks
1815class FTPHandler (LocalDFFileHandler ):
1916 connection_dto : FTPConnectionDTO
2017
@@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None:
3128 self .local_df_connection = SparkLocalFS (
3229 spark = spark ,
3330 ).check ()
34-
35- @slot
36- def read (self ) -> DataFrame :
37- return super ().read ()
38-
39- @slot
40- def write (self , df : DataFrame ) -> None :
41- return super ().write (df )
Original file line number Diff line number Diff line change 55
66from typing import TYPE_CHECKING
77
8- from onetl .hooks import slot , support_hooks
9-
108from syncmaster .dto .connections import FTPSConnectionDTO
119from syncmaster .worker .handlers .file .local_df import LocalDFFileHandler
1210
1311if TYPE_CHECKING :
14- from pyspark .sql import DataFrame , SparkSession
12+ from pyspark .sql import SparkSession
1513
1614
17- @support_hooks
1815class FTPSHandler (LocalDFFileHandler ):
1916 connection_dto : FTPSConnectionDTO
2017
@@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None:
3128 self .local_df_connection = SparkLocalFS (
3229 spark = spark ,
3330 ).check ()
34-
35- @slot
36- def read (self ) -> DataFrame :
37- return super ().read ()
38-
39- @slot
40- def write (self , df : DataFrame ) -> None :
41- return super ().write (df )
You can’t perform that action at this time.
0 commit comments