Skip to content

Commit 384b018

Browse files
committed
[DOP-28167] Slots on worker handlers support inheritance
1 parent 916160c commit 384b018

File tree

17 files changed

+29
-118
lines changed

17 files changed

+29
-118
lines changed

syncmaster/worker/handlers/db/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from onetl.base import BaseDBConnection
1010
from onetl.db import DBReader, DBWriter
11+
from onetl.hooks import slot, support_hooks
1112

1213
from syncmaster.dto.transfers import DBTransferDTO
1314
from syncmaster.worker.handlers.base import Handler
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class DBHandler(Handler):
2022
connection: BaseDBConnection
2123
transfer_dto: DBTransferDTO
@@ -35,6 +37,7 @@ class DBHandler(Handler):
3537
"not_ilike": "NOT ILIKE",
3638
}
3739

40+
@slot
3841
def read(self) -> DataFrame:
3942
reader_params = {}
4043
if self.transfer_dto.strategy.type == "incremental":
@@ -58,6 +61,7 @@ def read(self) -> DataFrame:
5861
)
5962
return reader.run()
6063

64+
@slot
6165
def write(self, df: DataFrame) -> None:
6266
if self.transfer_dto.strategy.type == "incremental" and self.hwm and self.hwm.value:
6367
self.transfer_dto.options["if_exists"] = "append"

syncmaster/worker/handlers/db/clickhouse.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ def connect(self, spark: SparkSession):
4242
spark=spark,
4343
).check()
4444

45-
@slot
46-
def read(self) -> DataFrame:
47-
return super().read()
48-
4945
@slot
5046
def write(self, df: DataFrame) -> None:
5147
normalized_df = self._normalize_column_names(df)

syncmaster/worker/handlers/db/hive.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ def read(self) -> DataFrame:
3838
self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name)
3939
return super().read()
4040

41-
@slot
42-
def write(self, df: DataFrame) -> None:
43-
return super().write(df)
44-
4541
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4642
for column_name in df.columns:
4743
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/iceberg.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Iceberg
9-
from onetl.hooks import slot, support_hooks
9+
from onetl.hooks import support_hooks
1010

1111
from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO
1212
from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO
@@ -51,14 +51,6 @@ def connect(self, spark: SparkSession):
5151
),
5252
).check()
5353

54-
@slot
55-
def read(self) -> DataFrame:
56-
return super().read()
57-
58-
@slot
59-
def write(self, df: DataFrame) -> None:
60-
return super().write(df)
61-
6254
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
6355
for column_name in df.columns:
6456
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/mssql.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import MSSQL
9-
from onetl.hooks import slot, support_hooks
9+
from onetl.hooks import support_hooks
1010

1111
from syncmaster.dto.connections import MSSQLConnectionDTO
1212
from syncmaster.dto.transfers import MSSQLTransferDTO
@@ -40,14 +40,6 @@ def connect(self, spark: SparkSession):
4040
spark=spark,
4141
).check()
4242

43-
@slot
44-
def read(self) -> DataFrame:
45-
return super().read()
46-
47-
@slot
48-
def write(self, df: DataFrame) -> None:
49-
return super().write(df)
50-
5143
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
5244
for column_name in df.columns:
5345
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/mysql.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import MySQL
9-
from onetl.hooks import slot, support_hooks
9+
from onetl.hooks import support_hooks
1010

1111
from syncmaster.dto.connections import MySQLConnectionDTO
1212
from syncmaster.dto.transfers import MySQLTransferDTO
@@ -37,14 +37,6 @@ def connect(self, spark: SparkSession):
3737
spark=spark,
3838
).check()
3939

40-
@slot
41-
def read(self) -> DataFrame:
42-
return super().read()
43-
44-
@slot
45-
def write(self, df: DataFrame) -> None:
46-
return super().write(df)
47-
4840
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4941
for column_name in df.columns:
5042
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/oracle.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Oracle
9-
from onetl.hooks import slot, support_hooks
9+
from onetl.hooks import support_hooks
1010

1111
from syncmaster.dto.connections import OracleConnectionDTO
1212
from syncmaster.dto.transfers import OracleTransferDTO
@@ -39,14 +39,6 @@ def connect(self, spark: SparkSession):
3939
spark=spark,
4040
).check()
4141

42-
@slot
43-
def read(self) -> DataFrame:
44-
return super().read()
45-
46-
@slot
47-
def write(self, df: DataFrame) -> None:
48-
return super().write(df)
49-
5042
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
5143
for column_name in df.columns:
5244
df = df.withColumnRenamed(column_name, column_name.upper())

syncmaster/worker/handlers/db/postgres.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Postgres
9-
from onetl.hooks import slot, support_hooks
9+
from onetl.hooks import support_hooks
1010

1111
from syncmaster.dto.connections import PostgresConnectionDTO
1212
from syncmaster.dto.transfers import PostgresTransferDTO
@@ -38,14 +38,6 @@ def connect(self, spark: SparkSession):
3838
spark=spark,
3939
).check()
4040

41-
@slot
42-
def read(self) -> DataFrame:
43-
return super().read()
44-
45-
@slot
46-
def write(self, df: DataFrame) -> None:
47-
return super().write(df)
48-
4941
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
5042
for column_name in df.columns:
5143
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/file/ftp.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
from typing import TYPE_CHECKING
77

8-
from onetl.hooks import slot, support_hooks
8+
from onetl.hooks import support_hooks
99

1010
from syncmaster.dto.connections import FTPConnectionDTO
1111
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler
1212

1313
if TYPE_CHECKING:
14-
from pyspark.sql import DataFrame, SparkSession
14+
from pyspark.sql import SparkSession
1515

1616

1717
@support_hooks
@@ -31,11 +31,3 @@ def connect(self, spark: SparkSession) -> None:
3131
self.local_df_connection = SparkLocalFS(
3232
spark=spark,
3333
).check()
34-
35-
@slot
36-
def read(self) -> DataFrame:
37-
return super().read()
38-
39-
@slot
40-
def write(self, df: DataFrame) -> None:
41-
return super().write(df)

syncmaster/worker/handlers/file/ftps.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
from typing import TYPE_CHECKING
77

8-
from onetl.hooks import slot, support_hooks
8+
from onetl.hooks import support_hooks
99

1010
from syncmaster.dto.connections import FTPSConnectionDTO
1111
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler
1212

1313
if TYPE_CHECKING:
14-
from pyspark.sql import DataFrame, SparkSession
14+
from pyspark.sql import SparkSession
1515

1616

1717
@support_hooks
@@ -31,11 +31,3 @@ def connect(self, spark: SparkSession) -> None:
3131
self.local_df_connection = SparkLocalFS(
3232
spark=spark,
3333
).check()
34-
35-
@slot
36-
def read(self) -> DataFrame:
37-
return super().read()
38-
39-
@slot
40-
def write(self, df: DataFrame) -> None:
41-
return super().write(df)

0 commit comments

Comments
 (0)