@@ -192,6 +192,7 @@ def _start_rsyncer_multigrid(
192192 analyse = analyse ,
193193 remove_files = remove_files ,
194194 limited = limited ,
195+ transfer = machine_data .get ("data_transfer_enabled" , True ),
195196 )
196197
197198 def _start_rsyncer (
@@ -203,6 +204,7 @@ def _start_rsyncer(
203204 analyse : bool = True ,
204205 remove_files : bool = False ,
205206 limited : bool = False ,
207+ transfer : bool = True ,
206208 ):
207209 log .info (f"starting rsyncer: { source } " )
208210 if self ._environment :
@@ -219,55 +221,57 @@ def _start_rsyncer(
219221 log .warning (
220222 f"Gain reference file { self ._environment .gain_ref } was not successfully transferred to { visit_path } /processing"
221223 )
222- self .rsync_processes [source ] = RSyncer (
223- source ,
224- basepath_remote = Path (destination ),
225- server_url = self ._url ,
226- # local=self._environment.demo,
227- status_bar = self ._statusbar ,
228- do_transfer = self ._do_transfer ,
229- required_substrings_for_removal = self ._data_substrings ,
230- remove_files = remove_files ,
231- )
224+ if transfer :
225+ self .rsync_processes [source ] = RSyncer (
226+ source ,
227+ basepath_remote = Path (destination ),
228+ server_url = self ._url ,
229+ # local=self._environment.demo,
230+ status_bar = self ._statusbar ,
231+ do_transfer = self ._do_transfer ,
232+ required_substrings_for_removal = self ._data_substrings ,
233+ remove_files = remove_files ,
234+ )
232235
233- def rsync_result (update : RSyncerUpdate ):
234- if not update .base_path :
235- raise ValueError ("No base path from rsyncer update" )
236- if not self .rsync_processes .get (update .base_path ):
237- raise ValueError ("TUI rsync process does not exist" )
238- if update .outcome is TransferResult .SUCCESS :
239- log .debug (
240- f"Succesfully transferred file { str (update .file_path )!r} ({ update .file_size } bytes)"
236+ def rsync_result (update : RSyncerUpdate ):
237+ if not update .base_path :
238+ raise ValueError ("No base path from rsyncer update" )
239+ if not self .rsync_processes .get (update .base_path ):
240+ raise ValueError ("TUI rsync process does not exist" )
241+ if update .outcome is TransferResult .SUCCESS :
242+ log .debug (
243+ f"Succesfully transferred file { str (update .file_path )!r} ({ update .file_size } bytes)"
244+ )
245+ # pass
246+ else :
247+ log .warning (f"Failed to transfer file { str (update .file_path )!r} " )
248+ self .rsync_processes [update .base_path ].enqueue (update .file_path )
249+
250+ self .rsync_processes [source ].subscribe (rsync_result )
251+ self .rsync_processes [source ].subscribe (
252+ partial (
253+ self ._increment_transferred_files_prometheus ,
254+ destination = destination ,
255+ source = str (source ),
241256 )
242- # pass
243- else :
244- log .warning (f"Failed to transfer file { str (update .file_path )!r} " )
245- self .rsync_processes [update .base_path ].enqueue (update .file_path )
246-
247- self .rsync_processes [source ].subscribe (rsync_result )
248- self .rsync_processes [source ].subscribe (
249- partial (
250- self ._increment_transferred_files_prometheus ,
251- destination = destination ,
252- source = str (source ),
253257 )
254- )
255- self . rsync_processes [ source ]. subscribe (
256- partial (
257- self . _increment_transferred_files ,
258- destination = destination ,
259- source = str ( source ),
260- ) ,
261- secondary = True ,
262- )
263- url = f" { str ( self . _url . geturl ()) } /sessions/ { str ( self . _environment . murfey_session ) } /rsyncer"
264- rsyncer_data = {
265- "source " : str ( source ) ,
266- "destination " : destination ,
267- "session_id " : self ._environment . murfey_session ,
268- "transferring" : self . _do_transfer ,
269- }
270- requests . post ( url , json = rsyncer_data )
258+ self . rsync_processes [ source ]. subscribe (
259+ partial (
260+ self . _increment_transferred_files ,
261+ destination = destination ,
262+ source = str ( source ) ,
263+ ),
264+ secondary = True ,
265+ )
266+ url = f" { str ( self . _url . geturl ()) } /sessions/ { str ( self . _environment . murfey_session ) } /rsyncer"
267+ rsyncer_data = {
268+ "source" : str ( source ),
269+ "destination " : destination ,
270+ "session_id " : self . _environment . murfey_session ,
271+ "transferring " : self ._do_transfer ,
272+ }
273+ requests . post ( url , json = rsyncer_data )
274+
271275 self ._environment .watchers [source ] = DirWatcher (source , settling_time = 30 )
272276
273277 if not self .analysers .get (source ) and analyse :
@@ -295,15 +299,36 @@ def rsync_result(update: RSyncerUpdate):
295299 else :
296300 self .analysers [source ].subscribe (self ._data_collection_form )
297301 self .analysers [source ].start ()
298- self .rsync_processes [source ].subscribe (self .analysers [source ].enqueue )
302+ if transfer :
303+ self .rsync_processes [source ].subscribe (self .analysers [source ].enqueue )
299304
300- self .rsync_processes [source ].start ()
305+ if transfer :
306+ self .rsync_processes [source ].start ()
301307
302308 if self ._environment :
303309 if self ._environment .watchers .get (source ):
304- self ._environment .watchers [source ].subscribe (
305- self .rsync_processes [source ].enqueue
306- )
310+ if transfer :
311+ self ._environment .watchers [source ].subscribe (
312+ self .rsync_processes [source ].enqueue
313+ )
314+ else :
315+
316+ def _rsync_update_converter (p : Path ) -> None :
317+ self .analysers [source ].enqueue (
318+ RSyncerUpdate (
319+ file_path = p ,
320+ file_size = 0 ,
321+ outcome = TransferResult .SUCCESS ,
322+ transfer_total = 0 ,
323+ queue_size = 0 ,
324+ base_path = source ,
325+ )
326+ )
327+ return None
328+
329+ self ._environment .watchers [source ].subscribe (
330+ _rsync_update_converter
331+ )
307332 self ._environment .watchers [source ].subscribe (
308333 partial (
309334 self ._increment_file_count ,
0 commit comments