Skip to content

Commit 6512a4a

Browse files
committed
[DOP-28167] Slots on worker handlers support inheritance
1 parent 916160c commit 6512a4a

File tree

17 files changed

+20
-135
lines changed

17 files changed

+20
-135
lines changed

syncmaster/worker/handlers/db/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from onetl.base import BaseDBConnection
1010
from onetl.db import DBReader, DBWriter
11+
from onetl.hooks import slot, support_hooks
1112

1213
from syncmaster.dto.transfers import DBTransferDTO
1314
from syncmaster.worker.handlers.base import Handler
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class DBHandler(Handler):
2022
connection: BaseDBConnection
2123
transfer_dto: DBTransferDTO
@@ -35,6 +37,7 @@ class DBHandler(Handler):
3537
"not_ilike": "NOT ILIKE",
3638
}
3739

40+
@slot
3841
def read(self) -> DataFrame:
3942
reader_params = {}
4043
if self.transfer_dto.strategy.type == "incremental":
@@ -58,6 +61,7 @@ def read(self) -> DataFrame:
5861
)
5962
return reader.run()
6063

64+
@slot
6165
def write(self, df: DataFrame) -> None:
6266
if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value:
6367
self.transfer_dto.options["if_exists"] = "append"

syncmaster/worker/handlers/db/clickhouse.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from onetl.connection import Clickhouse
99
from onetl.db import DBWriter
10-
from onetl.hooks import slot, support_hooks
10+
from onetl.hooks import slot
1111

1212
from syncmaster.dto.connections import ClickhouseConnectionDTO
1313
from syncmaster.dto.transfers import ClickhouseTransferDTO
@@ -18,7 +18,6 @@
1818
from pyspark.sql.dataframe import DataFrame
1919

2020

21-
@support_hooks
2221
class ClickhouseHandler(DBHandler):
2322
connection: Clickhouse
2423
connection_dto: ClickhouseConnectionDTO
@@ -42,10 +41,6 @@ def connect(self, spark: SparkSession):
4241
spark=spark,
4342
).check()
4443

45-
@slot
46-
def read(self) -> DataFrame:
47-
return super().read()
48-
4944
@slot
5045
def write(self, df: DataFrame) -> None:
5146
normalized_df = self._normalize_column_names(df)

syncmaster/worker/handlers/db/hive.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ def read(self) -> DataFrame:
3838
self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name)
3939
return super().read()
4040

41-
@slot
42-
def write(self, df: DataFrame) -> None:
43-
return super().write(df)
44-
4541
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4642
for column_name in df.columns:
4743
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/iceberg.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Iceberg
9-
from onetl.hooks import slot, support_hooks
109

1110
from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO
1211
from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO
@@ -17,7 +16,6 @@
1716
from pyspark.sql.dataframe import DataFrame
1817

1918

20-
@support_hooks
2119
class IcebergRESTCatalogS3Handler(DBHandler):
2220
connection: Iceberg
2321
connection_dto: IcebergRESTCatalogS3ConnectionDTO
@@ -51,14 +49,6 @@ def connect(self, spark: SparkSession):
5149
),
5250
).check()
5351

54-
@slot
55-
def read(self) -> DataFrame:
56-
return super().read()
57-
58-
@slot
59-
def write(self, df: DataFrame) -> None:
60-
return super().write(df)
61-
6252
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
6353
for column_name in df.columns:
6454
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/mssql.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import MSSQL
9-
from onetl.hooks import slot, support_hooks
109

1110
from syncmaster.dto.connections import MSSQLConnectionDTO
1211
from syncmaster.dto.transfers import MSSQLTransferDTO
@@ -17,7 +16,6 @@
1716
from pyspark.sql.dataframe import DataFrame
1817

1918

20-
@support_hooks
2119
class MSSQLHandler(DBHandler):
2220
connection: MSSQL
2321
connection_dto: MSSQLConnectionDTO
@@ -40,14 +38,6 @@ def connect(self, spark: SparkSession):
4038
spark=spark,
4139
).check()
4240

43-
@slot
44-
def read(self) -> DataFrame:
45-
return super().read()
46-
47-
@slot
48-
def write(self, df: DataFrame) -> None:
49-
return super().write(df)
50-
5141
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
5242
for column_name in df.columns:
5343
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/mysql.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import MySQL
9-
from onetl.hooks import slot, support_hooks
109

1110
from syncmaster.dto.connections import MySQLConnectionDTO
1211
from syncmaster.dto.transfers import MySQLTransferDTO
@@ -17,7 +16,6 @@
1716
from pyspark.sql.dataframe import DataFrame
1817

1918

20-
@support_hooks
2119
class MySQLHandler(DBHandler):
2220
connection: MySQL
2321
connection_dto: MySQLConnectionDTO
@@ -37,14 +35,6 @@ def connect(self, spark: SparkSession):
3735
spark=spark,
3836
).check()
3937

40-
@slot
41-
def read(self) -> DataFrame:
42-
return super().read()
43-
44-
@slot
45-
def write(self, df: DataFrame) -> None:
46-
return super().write(df)
47-
4838
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4939
for column_name in df.columns:
5040
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/oracle.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Oracle
9-
from onetl.hooks import slot, support_hooks
9+
from onetl.hooks import support_hooks
1010

1111
from syncmaster.dto.connections import OracleConnectionDTO
1212
from syncmaster.dto.transfers import OracleTransferDTO
@@ -39,14 +39,6 @@ def connect(self, spark: SparkSession):
3939
spark=spark,
4040
).check()
4141

42-
@slot
43-
def read(self) -> DataFrame:
44-
return super().read()
45-
46-
@slot
47-
def write(self, df: DataFrame) -> None:
48-
return super().write(df)
49-
5042
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
5143
for column_name in df.columns:
5244
df = df.withColumnRenamed(column_name, column_name.upper())

syncmaster/worker/handlers/db/postgres.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Postgres
9-
from onetl.hooks import slot, support_hooks
109

1110
from syncmaster.dto.connections import PostgresConnectionDTO
1211
from syncmaster.dto.transfers import PostgresTransferDTO
@@ -17,7 +16,6 @@
1716
from pyspark.sql.dataframe import DataFrame
1817

1918

20-
@support_hooks
2119
class PostgresHandler(DBHandler):
2220
connection: Postgres
2321
connection_dto: PostgresConnectionDTO
@@ -38,14 +36,6 @@ def connect(self, spark: SparkSession):
3836
spark=spark,
3937
).check()
4038

41-
@slot
42-
def read(self) -> DataFrame:
43-
return super().read()
44-
45-
@slot
46-
def write(self, df: DataFrame) -> None:
47-
return super().write(df)
48-
4939
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
5040
for column_name in df.columns:
5141
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/file/ftp.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@
55

66
from typing import TYPE_CHECKING
77

8-
from onetl.hooks import slot, support_hooks
9-
108
from syncmaster.dto.connections import FTPConnectionDTO
119
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler
1210

1311
if TYPE_CHECKING:
14-
from pyspark.sql import DataFrame, SparkSession
12+
from pyspark.sql import SparkSession
1513

1614

17-
@support_hooks
1815
class FTPHandler(LocalDFFileHandler):
1916
connection_dto: FTPConnectionDTO
2017

@@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None:
3128
self.local_df_connection = SparkLocalFS(
3229
spark=spark,
3330
).check()
34-
35-
@slot
36-
def read(self) -> DataFrame:
37-
return super().read()
38-
39-
@slot
40-
def write(self, df: DataFrame) -> None:
41-
return super().write(df)

syncmaster/worker/handlers/file/ftps.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@
55

66
from typing import TYPE_CHECKING
77

8-
from onetl.hooks import slot, support_hooks
9-
108
from syncmaster.dto.connections import FTPSConnectionDTO
119
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler
1210

1311
if TYPE_CHECKING:
14-
from pyspark.sql import DataFrame, SparkSession
12+
from pyspark.sql import SparkSession
1513

1614

17-
@support_hooks
1815
class FTPSHandler(LocalDFFileHandler):
1916
connection_dto: FTPSConnectionDTO
2017

@@ -31,11 +28,3 @@ def connect(self, spark: SparkSession) -> None:
3128
self.local_df_connection = SparkLocalFS(
3229
spark=spark,
3330
).check()
34-
35-
@slot
36-
def read(self) -> DataFrame:
37-
return super().read()
38-
39-
@slot
40-
def write(self, df: DataFrame) -> None:
41-
return super().write(df)

0 commit comments

Comments
 (0)