Skip to content

Commit c98f34b

Browse files
marashkaMarat Akhmetov
andauthored
[DOP-28167] Added slots for data-rentgen plugin integration (#279)
* [DOP-28167] Added slots for data-rentgen plugin integration * [DOP-28167] added changelog --------- Co-authored-by: Marat Akhmetov <[email protected]>
1 parent ee3ffb5 commit c98f34b

File tree

15 files changed

+137
-6
lines changed

15 files changed

+137
-6
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Added slots for data-rentgen plugin integration
2+
-- by :github:user:`marashka`

syncmaster/worker/controller.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from etl_entities.hwm_store import BaseHWMStore
88
from horizon.client.auth import LoginPassword
99
from horizon_hwm_store import HorizonHWMStore
10+
from onetl.hooks import slot, support_hooks
1011
from onetl.strategy import IncrementalStrategy
1112

1213
from syncmaster.db.models import Connection, Run
@@ -146,6 +147,7 @@
146147
}
147148

148149

150+
@support_hooks
149151
class TransferController:
150152
settings: WorkerAppSettings
151153
source_handler: Handler
@@ -187,6 +189,7 @@ def __init__(
187189
temp_dir=TemporaryDirectory(dir=self.temp_dir.name, prefix="written_"),
188190
)
189191

192+
@slot
190193
def perform_transfer(self) -> None:
191194
try:
192195
spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION(

syncmaster/worker/handlers/db/clickhouse.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from onetl.connection import Clickhouse
99
from onetl.db import DBWriter
10+
from onetl.hooks import slot, support_hooks
1011

1112
from syncmaster.dto.connections import ClickhouseConnectionDTO
1213
from syncmaster.dto.transfers import ClickhouseTransferDTO
@@ -17,6 +18,7 @@
1718
from pyspark.sql.dataframe import DataFrame
1819

1920

21+
@support_hooks
2022
class ClickhouseHandler(DBHandler):
2123
connection: Clickhouse
2224
connection_dto: ClickhouseConnectionDTO
@@ -40,6 +42,11 @@ def connect(self, spark: SparkSession):
4042
spark=spark,
4143
).check()
4244

45+
@slot
46+
def read(self) -> DataFrame:
47+
return super().read()
48+
49+
@slot
4350
def write(self, df: DataFrame) -> None:
4451
normalized_df = self._normalize_column_names(df)
4552
sort_column = next(

syncmaster/worker/handlers/db/hive.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Hive
9+
from onetl.hooks import slot, support_hooks
910

1011
from syncmaster.dto.connections import HiveConnectionDTO
1112
from syncmaster.dto.transfers import HiveTransferDTO
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class HiveHandler(DBHandler):
2022
connection: Hive
2123
connection_dto: HiveConnectionDTO
@@ -31,10 +33,15 @@ def connect(self, spark: SparkSession):
3133
spark=spark,
3234
).check()
3335

36+
@slot
3437
def read(self) -> DataFrame:
3538
self.connection.spark.catalog.refreshTable(self.transfer_dto.table_name)
3639
return super().read()
3740

41+
@slot
42+
def write(self, df: DataFrame) -> None:
43+
return super().write(df)
44+
3845
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
3946
for column_name in df.columns:
4047
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/mssql.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import MSSQL
9+
from onetl.hooks import slot, support_hooks
910

1011
from syncmaster.dto.connections import MSSQLConnectionDTO
1112
from syncmaster.dto.transfers import MSSQLTransferDTO
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class MSSQLHandler(DBHandler):
2022
connection: MSSQL
2123
connection_dto: MSSQLConnectionDTO
@@ -38,6 +40,14 @@ def connect(self, spark: SparkSession):
3840
spark=spark,
3941
).check()
4042

43+
@slot
44+
def read(self) -> DataFrame:
45+
return super().read()
46+
47+
@slot
48+
def write(self, df: DataFrame) -> None:
49+
return super().write(df)
50+
4151
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4252
for column_name in df.columns:
4353
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/mysql.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import MySQL
9+
from onetl.hooks import slot, support_hooks
910

1011
from syncmaster.dto.connections import MySQLConnectionDTO
1112
from syncmaster.dto.transfers import MySQLTransferDTO
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class MySQLHandler(DBHandler):
2022
connection: MySQL
2123
connection_dto: MySQLConnectionDTO
@@ -35,6 +37,14 @@ def connect(self, spark: SparkSession):
3537
spark=spark,
3638
).check()
3739

40+
@slot
41+
def read(self) -> DataFrame:
42+
return super().read()
43+
44+
@slot
45+
def write(self, df: DataFrame) -> None:
46+
return super().write(df)
47+
3848
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
3949
for column_name in df.columns:
4050
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/db/oracle.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Oracle
9+
from onetl.hooks import slot, support_hooks
910

1011
from syncmaster.dto.connections import OracleConnectionDTO
1112
from syncmaster.dto.transfers import OracleTransferDTO
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class OracleHandler(DBHandler):
2022
connection: Oracle
2123
connection_dto: OracleConnectionDTO
@@ -37,6 +39,14 @@ def connect(self, spark: SparkSession):
3739
spark=spark,
3840
).check()
3941

42+
@slot
43+
def read(self) -> DataFrame:
44+
return super().read()
45+
46+
@slot
47+
def write(self, df: DataFrame) -> None:
48+
return super().write(df)
49+
4050
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4151
for column_name in df.columns:
4252
df = df.withColumnRenamed(column_name, column_name.upper())

syncmaster/worker/handlers/db/postgres.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import TYPE_CHECKING
77

88
from onetl.connection import Postgres
9+
from onetl.hooks import slot, support_hooks
910

1011
from syncmaster.dto.connections import PostgresConnectionDTO
1112
from syncmaster.dto.transfers import PostgresTransferDTO
@@ -16,6 +17,7 @@
1617
from pyspark.sql.dataframe import DataFrame
1718

1819

20+
@support_hooks
1921
class PostgresHandler(DBHandler):
2022
connection: Postgres
2123
connection_dto: PostgresConnectionDTO
@@ -36,6 +38,14 @@ def connect(self, spark: SparkSession):
3638
spark=spark,
3739
).check()
3840

41+
@slot
42+
def read(self) -> DataFrame:
43+
return super().read()
44+
45+
@slot
46+
def write(self, df: DataFrame) -> None:
47+
return super().write(df)
48+
3949
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
4050
for column_name in df.columns:
4151
df = df.withColumnRenamed(column_name, column_name.lower())

syncmaster/worker/handlers/file/ftp.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55

66
from typing import TYPE_CHECKING
77

8+
from onetl.hooks import slot, support_hooks
9+
810
from syncmaster.dto.connections import FTPConnectionDTO
911
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler
1012

1113
if TYPE_CHECKING:
12-
from pyspark.sql import SparkSession
14+
from pyspark.sql import DataFrame, SparkSession
1315

1416

17+
@support_hooks
1518
class FTPHandler(LocalDFFileHandler):
1619
connection_dto: FTPConnectionDTO
1720

@@ -28,3 +31,11 @@ def connect(self, spark: SparkSession) -> None:
2831
self.local_df_connection = SparkLocalFS(
2932
spark=spark,
3033
).check()
34+
35+
@slot
36+
def read(self) -> DataFrame:
37+
return super().read()
38+
39+
@slot
40+
def write(self, df: DataFrame) -> None:
41+
return super().write(df)

syncmaster/worker/handlers/file/ftps.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,16 @@
55

66
from typing import TYPE_CHECKING
77

8+
from onetl.hooks import slot, support_hooks
9+
810
from syncmaster.dto.connections import FTPSConnectionDTO
911
from syncmaster.worker.handlers.file.local_df import LocalDFFileHandler
1012

1113
if TYPE_CHECKING:
12-
from pyspark.sql import SparkSession
14+
from pyspark.sql import DataFrame, SparkSession
1315

1416

17+
@support_hooks
1518
class FTPSHandler(LocalDFFileHandler):
1619
connection_dto: FTPSConnectionDTO
1720

@@ -28,3 +31,11 @@ def connect(self, spark: SparkSession) -> None:
2831
self.local_df_connection = SparkLocalFS(
2932
spark=spark,
3033
).check()
34+
35+
@slot
36+
def read(self) -> DataFrame:
37+
return super().read()
38+
39+
@slot
40+
def write(self, df: DataFrame) -> None:
41+
return super().write(df)

0 commit comments

Comments
 (0)