44from tempfile import TemporaryDirectory
55from typing import Any
66
7+ from horizon .client .auth import LoginPassword
8+ from horizon .client .sync import HorizonClientSync
9+ from horizon .commons .schemas .v1 import (
10+ NamespaceCreateRequestV1 ,
11+ NamespacePaginateQueryV1 ,
12+ )
13+ from horizon_hwm_store import HorizonHWMStore
14+ from onetl .strategy import IncrementalStrategy
15+
716from syncmaster .db .models import Connection , Run
817from syncmaster .dto .connections import (
918 ClickhouseConnectionDTO ,
3645 SFTPTransferDTO ,
3746 WebDAVTransferDTO ,
3847)
48+ from syncmaster .dto .transfers_strategy import Strategy
3949from syncmaster .exceptions .connection import ConnectionTypeNotRecognizedError
4050from syncmaster .worker .handlers .base import Handler
4151from syncmaster .worker .handlers .db .clickhouse import ClickhouseHandler
139149
140150
141151class TransferController :
152+ settings : WorkerAppSettings
142153 source_handler : Handler
143154 target_handler : Handler
144155
145156 def __init__ (
146157 self ,
158+ settings : WorkerAppSettings ,
147159 run : Run ,
148160 source_connection : Connection ,
149161 source_auth_data : dict ,
@@ -152,27 +164,32 @@ def __init__(
152164 ):
153165 self .temp_dir = TemporaryDirectory (prefix = f"syncmaster_{ run .id } _" )
154166
167+ self .settings = settings
155168 self .run = run
156169 self .source_handler = self .get_handler (
157170 connection_data = source_connection .data ,
158171 run_data = {"id" : run .id , "created_at" : run .created_at },
172+ transfer_id = run .transfer .id ,
159173 transfer_params = run .transfer .source_params ,
174+ strategy_params = run .transfer .strategy_params ,
160175 transformations = run .transfer .transformations ,
161176 connection_auth_data = source_auth_data ,
162177 temp_dir = TemporaryDirectory (dir = self .temp_dir .name , prefix = "downloaded_" ),
163178 )
164179 self .target_handler = self .get_handler (
165180 connection_data = target_connection .data ,
166181 run_data = {"id" : run .id , "created_at" : run .created_at },
182+ transfer_id = run .transfer .id ,
167183 transfer_params = run .transfer .target_params ,
184+ strategy_params = run .transfer .strategy_params ,
168185 transformations = run .transfer .transformations ,
169186 connection_auth_data = target_auth_data ,
170187 temp_dir = TemporaryDirectory (dir = self .temp_dir .name , prefix = "written_" ),
171188 )
172189
173- def perform_transfer (self , settings : WorkerAppSettings ) -> None :
190+ def perform_transfer (self ) -> None :
174191 try :
175- spark = settings .worker .CREATE_SPARK_SESSION_FUNCTION (
192+ spark = self . settings .worker .CREATE_SPARK_SESSION_FUNCTION (
176193 run = self .run ,
177194 source = self .source_handler .connection_dto ,
178195 target = self .target_handler .connection_dto ,
@@ -182,6 +199,9 @@ def perform_transfer(self, settings: WorkerAppSettings) -> None:
182199 self .source_handler .connect (spark )
183200 self .target_handler .connect (spark )
184201
202+ if self .source_handler .transfer_dto .strategy .type == "incremental" :
203+ return self ._perform_incremental_transfer ()
204+
185205 df = self .source_handler .read ()
186206 self .target_handler .write (df )
187207 finally :
@@ -192,7 +212,9 @@ def get_handler(
192212 connection_data : dict [str , Any ],
193213 connection_auth_data : dict ,
194214 run_data : dict [str , Any ],
215+ transfer_id : int ,
195216 transfer_params : dict [str , Any ],
217+ strategy_params : dict [str , Any ],
196218 transformations : list [dict ],
197219 temp_dir : TemporaryDirectory ,
198220 ) -> Handler :
@@ -207,7 +229,48 @@ def get_handler(
207229
208230 return handler (
209231 connection_dto = connection_dto (** connection_data ),
210- transfer_dto = transfer_dto (** transfer_params , transformations = transformations ),
232+ transfer_dto = transfer_dto (
233+ id = transfer_id ,
234+ strategy = Strategy .from_dict (strategy_params ),
235+ transformations = transformations ,
236+ ** transfer_params ,
237+ ),
211238 run_dto = run_dto (** run_data ),
212239 temp_dir = temp_dir ,
213240 )
241+
242+ def _perform_incremental_transfer (self ) -> None :
243+ self ._ensure_horizon_namespace_exists ()
244+
245+ with HorizonHWMStore (
246+ api_url = self .settings .horizon .url ,
247+ auth = LoginPassword (login = self .settings .horizon .user , password = self .settings .horizon .password ),
248+ namespace = self .settings .horizon .namespace ,
249+ ) as hwm_store :
250+ with IncrementalStrategy ():
251+ hwm_name = "_" .join (
252+ [
253+ str (self .source_handler .transfer_dto .id ),
254+ self .source_handler .connection_dto .type ,
255+ self .source_handler .transfer_dto .directory_path ,
256+ ],
257+ )
258+ hwm = hwm_store .get_hwm (hwm_name )
259+
260+ # S3 & HDFS sources do not currently support incremental reading
261+ if not isinstance (self .source_handler , (S3Handler , HDFSHandler )):
262+ self .source_handler .hwm = hwm
263+ self .target_handler .hwm = hwm
264+
265+ df = self .source_handler .read ()
266+ self .target_handler .write (df )
267+
268+ def _ensure_horizon_namespace_exists (self ) -> None :
269+ client = HorizonClientSync (
270+ base_url = self .settings .horizon .url ,
271+ auth = LoginPassword (login = self .settings .horizon .user , password = self .settings .horizon .password ),
272+ )
273+ client .authorize ()
274+ namespace_query = NamespacePaginateQueryV1 (name = self .settings .horizon .namespace )
275+ if not client .paginate_namespaces (query = namespace_query ).items :
276+ client .create_namespace (NamespaceCreateRequestV1 (name = self .settings .horizon .namespace ))
0 commit comments