11# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22# SPDX-License-Identifier: Apache-2.0
33import logging
4+ from tempfile import TemporaryDirectory
45from typing import Any
56
67from syncmaster .db .models import Connection , Run
@@ -103,40 +104,48 @@ def __init__(
103104 target_connection : Connection ,
104105 target_auth_data : dict ,
105106 ):
107+ self .temp_dir = TemporaryDirectory (prefix = "syncmaster_" )
108+
106109 self .run = run
107110 self .source_handler = self .get_handler (
108111 connection_data = source_connection .data ,
109112 transfer_params = run .transfer .source_params ,
110113 transformations = run .transfer .transformations ,
111114 connection_auth_data = source_auth_data ,
115+ temp_dir = TemporaryDirectory (dir = self .temp_dir .name , prefix = "downloaded_" ),
112116 )
113117 self .target_handler = self .get_handler (
114118 connection_data = target_connection .data ,
115119 transfer_params = run .transfer .target_params ,
116120 transformations = run .transfer .transformations ,
117121 connection_auth_data = target_auth_data ,
122+ temp_dir = TemporaryDirectory (dir = self .temp_dir .name , prefix = "written_" ),
118123 )
119124
120125 def perform_transfer (self , settings : WorkerAppSettings ) -> None :
121- spark = settings .worker .CREATE_SPARK_SESSION_FUNCTION (
122- run = self .run ,
123- source = self .source_handler .connection_dto ,
124- target = self .target_handler .connection_dto ,
125- )
126+ try :
127+ spark = settings .worker .CREATE_SPARK_SESSION_FUNCTION (
128+ run = self .run ,
129+ source = self .source_handler .connection_dto ,
130+ target = self .target_handler .connection_dto ,
131+ )
126132
127- with spark :
128- self .source_handler .connect (spark )
129- self .target_handler .connect (spark )
133+ with spark :
134+ self .source_handler .connect (spark )
135+ self .target_handler .connect (spark )
130136
131- df = self .source_handler .read ()
132- self .target_handler .write (df )
137+ df = self .source_handler .read ()
138+ self .target_handler .write (df )
139+ finally :
140+ self .temp_dir .cleanup ()
133141
134142 def get_handler (
135143 self ,
136144 connection_data : dict [str , Any ],
137145 connection_auth_data : dict ,
138146 transfer_params : dict [str , Any ],
139147 transformations : list [dict ],
148+ temp_dir : TemporaryDirectory ,
140149 ) -> Handler :
141150 connection_data .update (connection_auth_data )
142151 connection_data .pop ("type" )
@@ -150,4 +159,5 @@ def get_handler(
150159 return handler (
151160 connection_dto = connection_dto (** connection_data ),
152161 transfer_dto = transfer_dto (** transfer_params , transformations = transformations ),
162+ temp_dir = temp_dir ,
153163 )
0 commit comments