Skip to content

Commit 1cf689e

Browse files
committed
Sort out multigrid control
1 parent 25ac8d0 commit 1cf689e

File tree

2 files changed

+136
-47
lines changed

2 files changed

+136
-47
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 132 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from functools import partial
99
from pathlib import Path
1010
from typing import Dict, List, Optional
11-
from urllib.parse import quote, urlparse
11+
from urllib.parse import urlparse
1212

1313
import murfey.client.websocket
1414
from murfey.client.analyser import Analyser
@@ -19,7 +19,6 @@
1919
from murfey.client.tui.screens import determine_default_destination
2020
from murfey.client.watchdir import DirWatcher
2121
from murfey.util import posix_path
22-
from murfey.util.api import url_path_for
2322
from murfey.util.client import (
2423
capture_delete,
2524
capture_get,
@@ -57,7 +56,10 @@ class MultigridController:
5756

5857
def __post_init__(self):
5958
machine_data = capture_get(
60-
url=f"{self.murfey_url}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}"
59+
base_url=self.murfey_url,
60+
router_name="session_control.router",
61+
function_name="machine_info_by_instrument",
62+
instrument_name=self.instrument_name,
6163
).json()
6264
self.rsync_url = machine_data.get("rsync_url", "")
6365
self.rsync_module = machine_data.get("rsync_module", "data")
@@ -100,7 +102,9 @@ def __post_init__(self):
100102
# Calculate the time offset between the client and the server
101103
current_time = datetime.now()
102104
server_timestamp = capture_get(
103-
url=f"{self.murfey_url}{url_path_for('session_control.router', 'get_current_timestamp')}"
105+
base_url=self.murfey_url,
106+
router_name="session_control.router",
107+
function_name="get_current_timestamp",
104108
).json()["timestamp"]
105109
self.server_time_offset = current_time - datetime.fromtimestamp(
106110
server_timestamp
@@ -157,7 +161,10 @@ def clean_up_once_dormant(self, running_threads: list[threading.Thread]):
157161
f"Submitting request to remove session {self.session_id} from database"
158162
)
159163
response = capture_delete(
160-
f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'remove_session', session_id=self.session_id)}",
164+
base_url=str(self._environment.url.geturl()),
165+
router_name="session_control.router",
166+
function_name="remove_session",
167+
session_id=self.session_id,
161168
)
162169
success = response.status_code == 200 if response else False
163170
if not success:
@@ -229,7 +236,10 @@ def _start_rsyncer_multigrid(
229236
log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}")
230237
destination_overrides = destination_overrides or {}
231238
machine_data = capture_get(
232-
url=f"{self._environment.url.geturl()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=self.instrument_name)}"
239+
base_url=str(self._environment.url.geturl()),
240+
router_name="session_control.router",
241+
function_name="machine_info_by_instrument",
242+
instrument_name=self.instrument_name,
233243
).json()
234244
if destination_overrides.get(source):
235245
destination = (
@@ -281,11 +291,21 @@ def _start_rsyncer_multigrid(
281291

282292
def _rsyncer_stopped(self, source: Path, explicit_stop: bool = False):
283293
if explicit_stop:
284-
remove_url = f"{self.murfey_url}{url_path_for('session_control.router', 'delete_rsyncer', session_id=self.session_id)}?source={quote(str(source), safe='')}"
285-
capture_delete(url=remove_url)
294+
capture_delete(
295+
base_url=self.murfey_url,
296+
router_name="session_control.router",
297+
function_name="delete_rsyncer",
298+
session_id=self.session_id,
299+
data={"path": str(source)},
300+
)
286301
else:
287-
stop_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_stopped_rsyncer', session_id=self.session_id)}"
288-
capture_post(stop_url, json={"path": str(source)})
302+
capture_post(
303+
base_url=self.murfey_url,
304+
router_name="session_control.router",
305+
function_name="register_stopped_rsyncer",
306+
session_id=self.session_id,
307+
data={"path": str(source)},
308+
)
289309

290310
def _finalise_rsyncer(self, source: Path):
291311
"""
@@ -304,8 +324,13 @@ def _finalise_rsyncer(self, source: Path):
304324

305325
def _restart_rsyncer(self, source: Path):
306326
self.rsync_processes[source].restart()
307-
restarted_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_restarted_rsyncer', session_id=self.session_id)}"
308-
capture_post(restarted_url, json={"path": str(source)})
327+
capture_post(
328+
base_url=self.murfey_url,
329+
router_name="session_control.router",
330+
function_name="register_restarted_rsyncer",
331+
session_id=self.session_id,
332+
data={"path": str(source)},
333+
)
309334

310335
def _request_watcher_stop(self, source: Path):
311336
self._environment.watchers[source]._stopping = True
@@ -327,8 +352,13 @@ def _start_rsyncer(
327352
log.info(f"starting rsyncer: {source}")
328353
if transfer:
329354
# Always make sure the destination directory exists
330-
make_directory_url = f"{self.murfey_url}{url_path_for('file_io_instrument.router', 'make_rsyncer_destination', session_id=self.session_id)}"
331-
capture_post(make_directory_url, json={"destination": destination})
355+
capture_post(
356+
base_url=self.murfey_url,
357+
router_name="file_io_instrument.router",
358+
function_name="make_rsyncer_destination",
359+
session_id=self.session_id,
360+
data={"destination": destination},
361+
)
332362
if self._environment:
333363
self._environment.default_destinations[source] = destination
334364
if self._environment.gain_ref and visit_path:
@@ -397,18 +427,28 @@ def rsync_result(update: RSyncerUpdate):
397427
secondary=True,
398428
)
399429
if restarted:
400-
restarted_url = f"{self.murfey_url}{url_path_for('session_control.router', 'register_restarted_rsyncer', session_id=self.session_id)}"
401-
capture_post(restarted_url, json={"path": str(source)})
430+
capture_post(
431+
base_url=self.murfey_url,
432+
router_name="session_control.router",
433+
function_name="register_restarted_rsyncer",
434+
session_id=self.session_id,
435+
data={"path": str(source)},
436+
)
402437
else:
403-
url = f"{str(self._environment.url.geturl())}{url_path_for('session_control.router', 'register_rsyncer', session_id=self._environment.murfey_session)}"
404438
rsyncer_data = {
405439
"source": str(source),
406440
"destination": destination,
407441
"session_id": self.session_id,
408442
"transferring": self.do_transfer or self._environment.demo,
409443
"tag": tag,
410444
}
411-
capture_post(url=url, json=rsyncer_data)
445+
capture_post(
446+
base_url=self.murfey_url,
447+
router_name="session_control.router",
448+
function_name="register_rsyncer",
449+
session_id=self._environment.murfey_session,
450+
data=rsyncer_data,
451+
)
412452
self._environment.watchers[source] = DirWatcher(source, settling_time=30)
413453

414454
if not self.analysers.get(source) and analyse:
@@ -513,8 +553,12 @@ def _start_dc(self, metadata_json, from_form: bool = False):
513553
log.info("Registering tomography processing parameters")
514554
if context.data_collection_parameters.get("num_eer_frames"):
515555
eer_response = capture_post(
516-
url=f"{str(self._environment.url.geturl())}{url_path_for('file_io_instrument.router', 'write_eer_fractionation_file', visit_name=self._environment.visit, session_id=self._environment.murfey_session)}",
517-
json={
556+
base_url=str(self._environment.url.geturl()),
557+
router_name="file_io_instrument.router",
558+
function_name="write_eer_fractionation_file",
559+
visit_name=self._environment.visit,
560+
session_id=self._environment.murfey_session,
561+
data={
518562
"num_frames": context.data_collection_parameters[
519563
"num_eer_frames"
520564
],
@@ -526,17 +570,23 @@ def _start_dc(self, metadata_json, from_form: bool = False):
526570
eer_fractionation_file = eer_response.json()["eer_fractionation_file"]
527571
metadata_json.update({"eer_fractionation_file": eer_fractionation_file})
528572
capture_post(
529-
f"{self._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'register_tomo_proc_params', session_id=self._environment.murfey_session)}",
530-
json=metadata_json,
573+
base_url=str(self._environment.url.geturl()),
574+
router_name="workflow.tomo_router",
575+
function_name="register_tomo_proc_params",
576+
session_id=self._environment.murfey_session,
577+
data=metadata_json,
531578
)
532579
capture_post(
533-
f"{self._environment.url.geturl()}{url_path_for('workflow.tomo_router', 'flush_tomography_processing', visit_name=self._environment.visit, session_id=self._environment.murfey_session)}",
534-
json={"rsync_source": str(source)},
580+
base_url=str(self._environment.url.geturl()),
581+
router_name="workflow.tomo_router",
582+
function_name="flush_tomography_processing",
583+
visit_name=self._environment.visit,
584+
session_id=self._environment.murfey_session,
585+
data={"rsync_source": str(source)},
535586
)
536587
log.info("Tomography processing flushed")
537588

538589
elif isinstance(context, SPAModularContext):
539-
url = f"{str(self._environment.url.geturl())}{url_path_for('workflow.router', 'register_dc_group', visit_name=self._environment.visit, session_id=self.session_id)}"
540590
dcg_data = {
541591
"experiment_type": "single particle",
542592
"experiment_type_id": 37,
@@ -552,7 +602,14 @@ def _start_dc(self, metadata_json, from_form: bool = False):
552602
else None
553603
),
554604
}
555-
capture_post(url, json=dcg_data)
605+
capture_post(
606+
base_url=str(self._environment.url.geturl()),
607+
router_name="workflow.router",
608+
function_name="register_dc_group",
609+
visit_name=self._environment.visit,
610+
session_id=self.session_id,
611+
data=dcg_data,
612+
)
556613
if from_form:
557614
data = {
558615
"voltage": metadata_json["voltage"],
@@ -575,8 +632,12 @@ def _start_dc(self, metadata_json, from_form: bool = False):
575632
"phase_plate": metadata_json.get("phase_plate", False),
576633
}
577634
capture_post(
578-
f"{str(self._environment.url.geturl())}{url_path_for('workflow.router', 'start_dc', visit_name=self._environment.visit, session_id=self.session_id)}",
579-
json=data,
635+
base_url=str(self._environment.url.geturl()),
636+
router_name="workflow.router",
637+
function_name="start_dc",
638+
visit_name=self._environment.visit,
639+
session_id=self.session_id,
640+
data=data,
580641
)
581642
for recipe in (
582643
"em-spa-preprocess",
@@ -586,17 +647,24 @@ def _start_dc(self, metadata_json, from_form: bool = False):
586647
"em-spa-refine",
587648
):
588649
capture_post(
589-
f"{str(self._environment.url.geturl())}{url_path_for('workflow.router', 'register_proc', visit_name=self._environment.visit, session_id=self.session_id)}",
590-
json={
650+
base_url=str(self._environment.url.geturl()),
651+
router_name="workflow.router",
652+
function_name="register_proc",
653+
visit_name=self._environment.visit,
654+
session_id=self.session_id,
655+
data={
591656
"tag": str(source),
592657
"source": str(source),
593658
"recipe": recipe,
594659
},
595660
)
596661
log.info(f"Posting SPA processing parameters: {metadata_json}")
597662
response = capture_post(
598-
f"{self._environment.url.geturl()}{url_path_for('workflow.spa_router', 'register_spa_proc_params', session_id=self.session_id)}",
599-
json={
663+
base_url=str(self._environment.url.geturl()),
664+
router_name="workflow.spa_router",
665+
function_name="register_spa_proc_params",
666+
session_id=self.session_id,
667+
data={
600668
**{
601669
k: None if v == "None" else v
602670
for k, v in metadata_json.items()
@@ -607,14 +675,17 @@ def _start_dc(self, metadata_json, from_form: bool = False):
607675
if response and not str(response.status_code).startswith("2"):
608676
log.warning(f"{response.reason}")
609677
capture_post(
610-
f"{self._environment.url.geturl()}{url_path_for('workflow.spa_router', 'flush_spa_processing', visit_name=self._environment.visit, session_id=self.session_id)}",
611-
json={"tag": str(source)},
678+
base_url=str(self._environment.url.geturl()),
679+
router_name="workflow.spa_router",
680+
function_name="flush_spa_processing",
681+
visit_name=self._environment.visit,
682+
session_id=self.session_id,
683+
data={"tag": str(source)},
612684
)
613685

614686
def _increment_file_count(
615687
self, observed_files: List[Path], source: str, destination: str
616688
):
617-
url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_file_count', visit_name=self._environment.visit)}"
618689
num_data_files = len(
619690
[
620691
f
@@ -630,15 +701,20 @@ def _increment_file_count(
630701
"increment_count": len(observed_files),
631702
"increment_data_count": num_data_files,
632703
}
633-
capture_post(url=url, json=data)
704+
capture_post(
705+
base_url=str(self._environment.url.geturl()),
706+
router_name="prometheus.router",
707+
function_name="increment_rsync_file_count",
708+
visit_name=self._environment.visit,
709+
data=data,
710+
)
634711

635712
# Prometheus can handle higher traffic so update for every transferred file rather
636713
# than batching as we do for the Murfey database updates in _increment_transferred_files
637714
def _increment_transferred_files_prometheus(
638715
self, update: RSyncerUpdate, source: str, destination: str
639716
):
640717
if update.outcome is TransferResult.SUCCESS:
641-
url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_transferred_files_prometheus', visit_name=self._environment.visit)}"
642718
data_files = (
643719
[update]
644720
if update.file_path.suffix in self._data_suffixes
@@ -657,7 +733,13 @@ def _increment_transferred_files_prometheus(
657733
"increment_data_count": len(data_files),
658734
"data_bytes": sum(f.file_size for f in data_files),
659735
}
660-
capture_post(url=url, json=data)
736+
capture_post(
737+
base_url=str(self._environment.url.geturl()),
738+
router_name="prometheus.router",
739+
function_name="increment_rsync_transferred_files_prometheus",
740+
visit_name=self._environment.visit,
741+
data=data,
742+
)
661743

662744
def _increment_transferred_files(
663745
self,
@@ -666,10 +748,12 @@ def _increment_transferred_files(
666748
source: str,
667749
destination: str,
668750
):
669-
skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}"
670751
capture_post(
671-
url=skip_url,
672-
json={
752+
base_url=str(self._environment.url.geturl()),
753+
router_name="prometheus.router",
754+
function_name="increment_rsync_skipped_files_prometheus",
755+
visit_name=self._environment.visit,
756+
data={
673757
"source": source,
674758
"session_id": self.session_id,
675759
"increment_count": num_skipped_files,
@@ -681,7 +765,6 @@ def _increment_transferred_files(
681765
]
682766
if not checked_updates:
683767
return
684-
url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_transferred_files', visit_name=self._environment.visit)}"
685768
data_files = [
686769
u
687770
for u in updates
@@ -699,4 +782,10 @@ def _increment_transferred_files(
699782
"increment_data_count": len(data_files),
700783
"data_bytes": sum(f.file_size for f in data_files),
701784
}
702-
capture_post(url=url, json=data)
785+
capture_post(
786+
base_url=str(self._environment.url.geturl()),
787+
router_name="prometheus.router",
788+
function_name="increment_rsync_transferred_files",
789+
visit_name=self._environment.visit,
790+
data=data,
791+
)

src/murfey/server/api/session_control.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,19 +285,19 @@ def register_restarted_rsyncer(
285285

286286

287287
@router.delete("/sessions/{session_id}/rsyncer")
288-
def delete_rsyncer(session_id: int, source: Path, db=murfey_db):
288+
def delete_rsyncer(session_id: int, rsyncer_source: StringOfPathModel, db=murfey_db):
289289
try:
290290
rsync_instance = db.exec(
291291
select(RsyncInstance)
292292
.where(RsyncInstance.session_id == session_id)
293-
.where(RsyncInstance.source == str(source))
293+
.where(RsyncInstance.source == str(rsyncer_source.path))
294294
).one()
295295
db.delete(rsync_instance)
296296
db.commit()
297297
except Exception:
298298
logger.error(
299-
f"Failed to delete rsyncer for source directory {sanitise(str(source))!r} "
300-
f"in session {session_id}.",
299+
"Failed to delete rsyncer for source directory "
300+
f"{sanitise(str(rsyncer_source.path))!r} in session {session_id}.",
301301
exc_info=True,
302302
)
303303

0 commit comments

Comments
 (0)