Skip to content

Commit a1996a9

Browse files
committed
Setup for multigrid controller to be able to skip the rsync step
1 parent 7c6548f commit a1996a9

File tree

1 file changed

+67
-44
lines changed

1 file changed

+67
-44
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 67 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ def _start_rsyncer(
173173
remove_files: bool = False,
174174
tag: str = "",
175175
limited: bool = False,
176+
transfer: bool = True,
176177
):
177178
log.info(f"starting rsyncer: {source}")
178179
if self._environment:
@@ -189,47 +190,48 @@ def _start_rsyncer(
189190
log.warning(
190191
f"Gain reference file {self._environment.gain_ref} was not successfully transferred to {visit_path}/processing"
191192
)
192-
self.rsync_processes[source] = RSyncer(
193-
source,
194-
basepath_remote=Path(destination),
195-
server_url=self._environment.url,
196-
stop_callback=self._rsyncer_stopped,
197-
do_transfer=self.do_transfer,
198-
remove_files=remove_files,
199-
)
193+
if transfer:
194+
self.rsync_processes[source] = RSyncer(
195+
source,
196+
basepath_remote=Path(destination),
197+
server_url=self._environment.url,
198+
stop_callback=self._rsyncer_stopped,
199+
do_transfer=self.do_transfer,
200+
remove_files=remove_files,
201+
)
200202

201-
def rsync_result(update: RSyncerUpdate):
202-
if not update.base_path:
203-
raise ValueError("No base path from rsyncer update")
204-
if not self.rsync_processes.get(update.base_path):
205-
raise ValueError("TUI rsync process does not exist")
206-
if update.outcome is TransferResult.SUCCESS:
207-
# log.info(
208-
# f"File {str(update.file_path)!r} successfully transferred ({update.file_size} bytes)"
209-
# )
210-
pass
211-
else:
212-
log.warning(f"Failed to transfer file {str(update.file_path)!r}")
213-
self.rsync_processes[update.base_path].enqueue(update.file_path)
203+
def rsync_result(update: RSyncerUpdate):
204+
if not update.base_path:
205+
raise ValueError("No base path from rsyncer update")
206+
if not self.rsync_processes.get(update.base_path):
207+
raise ValueError("TUI rsync process does not exist")
208+
if update.outcome is TransferResult.SUCCESS:
209+
# log.info(
210+
# f"File {str(update.file_path)!r} successfully transferred ({update.file_size} bytes)"
211+
# )
212+
pass
213+
else:
214+
log.warning(f"Failed to transfer file {str(update.file_path)!r}")
215+
self.rsync_processes[update.base_path].enqueue(update.file_path)
214216

215-
self.rsync_processes[source].subscribe(rsync_result)
216-
self.rsync_processes[source].subscribe(
217-
partial(
218-
self._increment_transferred_files,
219-
destination=destination,
220-
source=str(source),
221-
),
222-
secondary=True,
223-
)
224-
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
225-
rsyncer_data = {
226-
"source": str(source),
227-
"destination": destination,
228-
"session_id": self.session_id,
229-
"transferring": self.do_transfer or self._environment.demo,
230-
"tag": tag,
231-
}
232-
requests.post(url, json=rsyncer_data)
217+
self.rsync_processes[source].subscribe(rsync_result)
218+
self.rsync_processes[source].subscribe(
219+
partial(
220+
self._increment_transferred_files,
221+
destination=destination,
222+
source=str(source),
223+
),
224+
secondary=True,
225+
)
226+
url = f"{str(self._environment.url.geturl())}/sessions/{str(self._environment.murfey_session)}/rsyncer"
227+
rsyncer_data = {
228+
"source": str(source),
229+
"destination": destination,
230+
"session_id": self.session_id,
231+
"transferring": self.do_transfer or self._environment.demo,
232+
"tag": tag,
233+
}
234+
requests.post(url, json=rsyncer_data)
233235
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
234236

235237
if not self.analysers.get(source) and analyse:
@@ -254,15 +256,36 @@ def rsync_result(update: RSyncerUpdate):
254256
else:
255257
self.analysers[source].subscribe(self._data_collection_form)
256258
self.analysers[source].start()
257-
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
259+
if transfer:
260+
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
258261

259-
self.rsync_processes[source].start()
262+
if transfer:
263+
self.rsync_processes[source].start()
260264

261265
if self._environment:
262266
if self._environment.watchers.get(source):
263-
self._environment.watchers[source].subscribe(
264-
self.rsync_processes[source].enqueue
265-
)
267+
if transfer:
268+
self._environment.watchers[source].subscribe(
269+
self.rsync_processes[source].enqueue
270+
)
271+
else:
272+
# the watcher and rsyncer don't notify with the same object so conversion required here
273+
def _rsync_update_converter(p: Path) -> None:
274+
self.analysers[source].enqueue(
275+
RSyncerUpdate(
276+
file_path=p,
277+
file_size=0,
278+
outcome=TransferResult.SUCCESS,
279+
transfer_total=0,
280+
queue_size=0,
281+
base_path=source,
282+
)
283+
)
284+
return None
285+
286+
self._environment.watchers[source].subscribe(
287+
_rsync_update_converter
288+
)
266289
self._environment.watchers[source].subscribe(
267290
partial(
268291
self._increment_file_count,

0 commit comments

Comments
 (0)