11# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22# SPDX-License-Identifier: Apache-2.0
33import logging
4+ import tempfile
5+ from pathlib import Path
46from typing import Any
57
68from syncmaster .db .models import Connection , Run
@@ -103,40 +105,48 @@ def __init__(
103105 target_connection : Connection ,
104106 target_auth_data : dict ,
105107 ):
108+ self .temp_dir = tempfile .TemporaryDirectory (prefix = "syncmaster_" )
109+
106110 self .run = run
107111 self .source_handler = self .get_handler (
108112 connection_data = source_connection .data ,
109113 transfer_params = run .transfer .source_params ,
110114 transformations = run .transfer .transformations ,
111115 connection_auth_data = source_auth_data ,
116+ temp_dir = tempfile .TemporaryDirectory (dir = self .temp_dir .name , prefix = "downloaded_" ),
112117 )
113118 self .target_handler = self .get_handler (
114119 connection_data = target_connection .data ,
115120 transfer_params = run .transfer .target_params ,
116121 transformations = run .transfer .transformations ,
117122 connection_auth_data = target_auth_data ,
123+ temp_dir = tempfile .TemporaryDirectory (dir = self .temp_dir .name , prefix = "written_" ),
118124 )
119125
120126 def perform_transfer (self , settings : WorkerAppSettings ) -> None :
121- spark = settings .worker .CREATE_SPARK_SESSION_FUNCTION (
122- run = self .run ,
123- source = self .source_handler .connection_dto ,
124- target = self .target_handler .connection_dto ,
125- )
127+ try :
128+ spark = settings .worker .CREATE_SPARK_SESSION_FUNCTION (
129+ run = self .run ,
130+ source = self .source_handler .connection_dto ,
131+ target = self .target_handler .connection_dto ,
132+ )
126133
127- with spark :
128- self .source_handler .connect (spark )
129- self .target_handler .connect (spark )
134+ with spark :
135+ self .source_handler .connect (spark )
136+ self .target_handler .connect (spark )
130137
131- df = self .source_handler .read ()
132- self .target_handler .write (df )
138+ df = self .source_handler .read ()
139+ self .target_handler .write (df )
140+ finally :
141+ self .temp_dir .cleanup ()
133142
134143 def get_handler (
135144 self ,
136145 connection_data : dict [str , Any ],
137146 connection_auth_data : dict ,
138147 transfer_params : dict [str , Any ],
139148 transformations : list [dict ],
149+ temp_dir : Path ,
140150 ) -> Handler :
141151 connection_data .update (connection_auth_data )
142152 connection_data .pop ("type" )
@@ -150,4 +160,5 @@ def get_handler(
150160 return handler (
151161 connection_dto = connection_dto (** connection_data ),
152162 transfer_dto = transfer_dto (** transfer_params , transformations = transformations ),
163+ temp_dir = temp_dir ,
153164 )
0 commit comments