Skip to content

Commit 59297f9

Browse files
Turn off analysers when a request is made not to process (#590)
* Add 'process' column to 'Session' table to record if processing is requested or not * Allow analysis to be turned off via a query parameter * Added logic for server to work out from the session whether processing should be enabled * Added '_analyse' attribute to 'MultigridDirWatcher' to control when to process data; adjusts logic when notifying listeners to take 'self_analyse' into account --------- Co-authored-by: Eu Pin Tien <[email protected]>
1 parent 010f227 commit 59297f9

File tree

7 files changed

+81
-36
lines changed

7 files changed

+81
-36
lines changed

src/murfey/client/multigrid_control.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ def _start_rsyncer_multigrid(
160160
tag: str = "",
161161
limited: bool = False,
162162
):
163-
log.info(f"starting multigrid rsyncer: {source}")
163+
log.info(f"Starting multigrid rsyncer: {source}")
164+
log.debug(f"Analysis of {source} is {('enabled' if analyse else 'disabled')}")
164165
destination_overrides = destination_overrides or {}
165166
machine_data = requests.get(
166167
f"{self._environment.url.geturl()}/instruments/{self.instrument_name}/machine"

src/murfey/client/watchdir_multigrid.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@ def __init__(
2121
):
2222
super().__init__()
2323
self._basepath = Path(path)
24-
self._skip_existing_processing = skip_existing_processing
25-
self._seen_dirs: List[Path] = []
26-
self._stopping = False
2724
self._machine_config = machine_config
25+
self._seen_dirs: List[Path] = []
2826
self.thread = threading.Thread(
2927
name=f"MultigridDirWatcher {self._basepath}",
3028
target=self._process,
3129
daemon=True,
3230
)
31+
# Toggleable settings
32+
self._analyse = True
33+
self._skip_existing_processing = skip_existing_processing
34+
self._stopping = False
3335

3436
def start(self):
3537
if self.thread.is_alive():
@@ -60,8 +62,14 @@ def _process(self):
6062
include_mid_path=False,
6163
use_suggested_path=False,
6264
analyse=(
63-
d.name
64-
in self._machine_config["analyse_created_directories"]
65+
(
66+
d.name
67+
in self._machine_config[
68+
"analyse_created_directories"
69+
]
70+
)
71+
if self._analyse
72+
else False
6573
),
6674
tag="atlas",
6775
)
@@ -72,23 +80,25 @@ def _process(self):
7280
d,
7381
extra_directory=f"metadata_{d.name}",
7482
include_mid_path=False,
75-
analyse=True, # not (first_loop and self._skip_existing_processing),
83+
analyse=self._analyse,
7684
limited=True,
7785
tag="metadata",
7886
)
7987
self._seen_dirs.append(d)
8088
processing_started = False
8189
for d02 in (d.parent.parent / d.name).glob("Images-Disc*"):
8290
if d02 not in self._seen_dirs:
83-
# if skip exisiting processing is set then do not process for any
84-
# data directories found on the first loop
85-
# this allows you to avoid triggering processing again if murfey is restarted
91+
# If 'skip_existing_processing' is set, do not process for
92+
# any data directories found on the first loop.
93+
# This allows you to avoid triggering processing again if Murfey is restarted
8694
self.notify(
8795
d02,
8896
include_mid_path=False,
8997
remove_files=True,
90-
analyse=not (
91-
first_loop and self._skip_existing_processing
98+
analyse=(
99+
not (first_loop and self._skip_existing_processing)
100+
if self._analyse
101+
else False
92102
),
93103
tag="fractions",
94104
)
@@ -104,8 +114,10 @@ def _process(self):
104114
self.notify(
105115
d02,
106116
include_mid_path=False,
107-
analyse=not (
108-
first_loop and self._skip_existing_processing
117+
analyse=(
118+
not (first_loop and self._skip_existing_processing)
119+
if self._analyse
120+
else False
109121
),
110122
tag="fractions",
111123
)

src/murfey/instrument_server/api.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from functools import partial
88
from logging import getLogger
99
from pathlib import Path
10-
from typing import Annotated, Any, Dict, List, Optional, Union
10+
from typing import Annotated, Any, Optional
1111
from urllib.parse import urlparse
1212

1313
import requests
@@ -27,9 +27,9 @@
2727

2828
logger = getLogger("murfey.instrument_server.api")
2929

30-
watchers: Dict[Union[str, int], MultigridDirWatcher] = {}
31-
rsyncers: Dict[str, RSyncer] = {}
32-
controllers: Dict[int, MultigridController] = {}
30+
watchers: dict[str | int, MultigridDirWatcher] = {}
31+
rsyncers: dict[str, RSyncer] = {}
32+
controllers: dict[int, MultigridController] = {}
3333
data_collection_parameters: dict = {}
3434
tokens = {}
3535

@@ -187,9 +187,11 @@ def setup_multigrid_watcher(
187187

188188

189189
@router.post("/sessions/{session_id}/start_multigrid_watcher")
190-
def start_multigrid_watcher(session_id: MurfeySessionID):
190+
def start_multigrid_watcher(session_id: MurfeySessionID, process: bool = True):
191191
if watchers.get(session_id) is None:
192192
return {"success": False}
193+
if not process:
194+
watchers[session_id]._analyse = False
193195
watchers[session_id].start()
194196
return {"success": True}
195197

@@ -322,7 +324,7 @@ def register_processing_parameters(
322324
)
323325
def get_possible_gain_references(
324326
instrument_name: str, session_id: MurfeySessionID
325-
) -> List[File]:
327+
) -> list[File]:
326328
machine_config = requests.get(
327329
f"{_get_murfey_url()}/instruments/{sanitise_nonpath(instrument_name)}/machine",
328330
headers={"Authorization": f"Bearer {tokens[session_id]}"},

src/murfey/server/api/__init__.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -372,13 +372,22 @@ async def get_session(session_id: MurfeySessionID, db=murfey_db) -> SessionClien
372372
def increment_rsync_file_count(
373373
visit_name: str, rsyncer_info: RsyncerInfo, db=murfey_db
374374
):
375-
rsync_instance = db.exec(
376-
select(RsyncInstance).where(
377-
RsyncInstance.source == rsyncer_info.source,
378-
RsyncInstance.destination == rsyncer_info.destination,
379-
RsyncInstance.session_id == rsyncer_info.session_id,
375+
try:
376+
rsync_instance = db.exec(
377+
select(RsyncInstance).where(
378+
RsyncInstance.source == rsyncer_info.source,
379+
RsyncInstance.destination == rsyncer_info.destination,
380+
RsyncInstance.session_id == rsyncer_info.session_id,
381+
)
382+
).one()
383+
except Exception:
384+
log.error(
385+
f"Failed to find rsync instance for visit {sanitise(visit_name)} "
386+
"with the following properties: \n"
387+
f"{rsyncer_info.dict()}",
388+
exc_info=True,
380389
)
381-
).one()
390+
return None
382391
rsync_instance.files_counted += rsyncer_info.increment_count
383392
db.add(rsync_instance)
384393
db.commit()
@@ -1996,6 +2005,17 @@ def create_session(
19962005
return sid
19972006

19982007

2008+
@router.post("/sessions/{session_id}")
2009+
def update_session(
2010+
session_id: MurfeySessionID, process: bool = True, db=murfey_db
2011+
) -> None:
2012+
session = db.exec(select(Session).where(Session.id == session_id)).one()
2013+
session.process = process
2014+
db.add(session)
2015+
db.commit()
2016+
return None
2017+
2018+
19992019
@router.put("/sessions/{session_id}/current_gain_ref")
20002020
def update_current_gain_ref(
20012021
session_id: MurfeySessionID, new_gain_ref: CurrentGainRef, db=murfey_db

src/murfey/server/api/instrument.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,21 +142,26 @@ async def setup_multigrid_watcher(
142142
@router.post("/sessions/{session_id}/start_multigrid_watcher")
143143
async def start_multigrid_watcher(session_id: MurfeySessionID, db=murfey_db):
144144
data = {}
145-
instrument_name = (
146-
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
147-
)
145+
session = db.exec(select(Session).where(Session.id == session_id)).one()
146+
process = session.process
147+
instrument_name = session.instrument_name
148148
machine_config = get_machine_config(instrument_name=instrument_name)[
149149
instrument_name
150150
]
151151
if machine_config.instrument_server_url:
152+
log.debug(
153+
f"Submitting request to start multigrid watcher for session {session_id} "
154+
f"with processing {('enabled' if process else 'disabled')}"
155+
)
152156
async with aiohttp.ClientSession() as clientsession:
153157
async with clientsession.post(
154-
f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher",
158+
f"{machine_config.instrument_server_url}/sessions/{session_id}/start_multigrid_watcher?process={'true' if process else 'false'}",
155159
headers={
156160
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
157161
},
158162
) as resp:
159163
data = await resp.json()
164+
log.debug(f"Received response: {data}")
160165
return data
161166

162167

src/murfey/server/api/processing_parameters.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from logging import getLogger
22
from typing import Optional
33

4+
import sqlalchemy
45
from fastapi import APIRouter, Depends
56
from pydantic import BaseModel
67
from sqlmodel import Session, select
@@ -24,12 +25,15 @@ class EditableSessionProcessingParameters(BaseModel):
2425
@router.get("/sessions/{session_id}/session_processing_parameters")
2526
def get_session_processing_parameters(
2627
session_id: MurfeySessionID, db: Session = murfey_db
27-
) -> EditableSessionProcessingParameters:
28-
proc_params = db.exec(
29-
select(SessionProcessingParameters).where(
30-
SessionProcessingParameters.session_id == session_id
31-
)
32-
).one()
28+
) -> Optional[EditableSessionProcessingParameters]:
29+
try:
30+
proc_params = db.exec(
31+
select(SessionProcessingParameters).where(
32+
SessionProcessingParameters.session_id == session_id
33+
)
34+
).one()
35+
except sqlalchemy.exc.NoResultFound:
36+
return None
3337
return EditableSessionProcessingParameters(
3438
gain_ref=proc_params.gain_ref,
3539
dose_per_frame=proc_params.dose_per_frame,

src/murfey/util/db.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class Session(SQLModel, table=True): # type: ignore
4949
started: bool = Field(default=False)
5050
current_gain_ref: str = Field(default="")
5151
instrument_name: str = Field(default="")
52+
process: bool = Field(default=True)
5253
visit_end_time: Optional[datetime] = Field(default=None)
5354

5455
# CLEM Workflow

0 commit comments

Comments
 (0)