Skip to content

Commit 69afc1d

Browse files
committed
Merged CLEM workflow fix changes into current branch
1 parent 9ecc94c commit 69afc1d

File tree

9 files changed

+167
-107
lines changed

9 files changed

+167
-107
lines changed

src/murfey/client/contexts/clem.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"""
55

66
import logging
7-
from datetime import datetime
87
from pathlib import Path
98
from typing import Dict, Generator, List, Optional
109
from urllib.parse import quote
@@ -32,13 +31,16 @@ def _file_transferred_to(
3231
instrument_name=environment.instrument_name,
3332
demo=environment.demo,
3433
)
35-
# rsync basepath and modules are set in the microscope's configuration YAML file
36-
return (
37-
Path(machine_config.get("rsync_basepath", ""))
38-
/ str(datetime.now().year)
39-
/ source.name
40-
/ file_path.relative_to(source)
34+
35+
# Construct destination path
36+
base_destination = Path(machine_config.get("rsync_basepath", "")) / Path(
37+
environment.default_destinations[source]
4138
)
39+
# Add visit number to the path if it's not present in default destination
40+
if environment.visit not in environment.default_destinations[source]:
41+
base_destination = base_destination / environment.visit
42+
destination = base_destination / file_path.relative_to(source)
43+
return destination
4244

4345

4446
def _get_source(
@@ -292,7 +294,7 @@ def post_transfer(
292294
post_result = self.process_tiff_series(tiff_dataset, environment)
293295
if post_result is False:
294296
return False
295-
297+
logger.info(f"Started preprocessing of TIFF series {series_name!r}")
296298
else:
297299
logger.debug(f"TIFF series {series_name!r} is still being processed")
298300

src/murfey/instrument_server/api.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,22 @@ def check_token(session_id: MurfeySessionID):
142142
def setup_multigrid_watcher(
143143
session_id: MurfeySessionID, watcher_spec: MultigridWatcherSpec
144144
):
145+
# Return 'True' if controllers are already set up
145146
if controllers.get(session_id) is not None:
146147
return {"success": True}
148+
147149
label = watcher_spec.label
148150
for sid, controller in controllers.items():
149151
if controller.dormant:
150152
del controllers[sid]
153+
154+
# Load machine config as dictionary
155+
machine_config: dict[str, Any] = requests.get(
156+
f"{_get_murfey_url()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=sanitise_nonpath(watcher_spec.instrument_name))}",
157+
headers={"Authorization": f"Bearer {tokens[session_id]}"},
158+
).json()
159+
160+
# Set up the multigrid controll controller
151161
controllers[session_id] = MultigridController(
152162
[],
153163
watcher_spec.visit,
@@ -157,22 +167,21 @@ def setup_multigrid_watcher(
157167
demo=True,
158168
do_transfer=True,
159169
processing_enabled=not watcher_spec.skip_existing_processing,
160-
_machine_config=watcher_spec.configuration.dict(),
170+
_machine_config=machine_config,
161171
token=tokens.get(session_id, "token"),
162172
data_collection_parameters=data_collection_parameters.get(label, {}),
163173
rsync_restarts=watcher_spec.rsync_restarts,
164174
visit_end_time=watcher_spec.visit_end_time,
165175
)
176+
# Make child directories, if specified
166177
watcher_spec.source.mkdir(exist_ok=True)
167-
machine_config = requests.get(
168-
f"{_get_murfey_url()}{url_path_for('session_control.router', 'machine_info_by_instrument', instrument_name=sanitise_nonpath(watcher_spec.instrument_name))}",
169-
headers={"Authorization": f"Bearer {tokens[session_id]}"},
170-
).json()
171178
for d in machine_config.get("create_directories", []):
172179
(watcher_spec.source / d).mkdir(exist_ok=True)
180+
181+
# Set up multigrid directory watcher
173182
watchers[session_id] = MultigridDirWatcher(
174183
watcher_spec.source,
175-
watcher_spec.configuration.dict(),
184+
machine_config,
176185
skip_existing_processing=watcher_spec.skip_existing_processing,
177186
)
178187
watchers[session_id].subscribe(

src/murfey/server/api/instrument.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -110,22 +110,12 @@ async def setup_multigrid_watcher(
110110
if machine_config.instrument_server_url:
111111
session = db.exec(select(Session).where(Session.id == session_id)).one()
112112
visit = session.visit
113-
_config = {
114-
"acquisition_software": machine_config.acquisition_software,
115-
"calibrations": machine_config.calibrations,
116-
"data_directories": [str(k) for k in machine_config.data_directories],
117-
"create_directories": [str(k) for k in machine_config.create_directories],
118-
"rsync_basepath": str(machine_config.rsync_basepath),
119-
"visit": visit,
120-
"default_model": str(machine_config.default_model),
121-
}
122113
async with aiohttp.ClientSession() as clientsession:
123114
async with clientsession.post(
124115
f"{machine_config.instrument_server_url}{url_path_for('api.router', 'setup_multigrid_watcher', session_id=session_id)}",
125116
json={
126117
"source": str(secure_path(watcher_spec.source / visit)),
127118
"visit": visit,
128-
"configuration": _config,
129119
"label": visit,
130120
"instrument_name": instrument_name,
131121
"skip_existing_processing": watcher_spec.skip_existing_processing,

src/murfey/util/config.py

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,62 @@
44
import socket
55
from functools import lru_cache
66
from pathlib import Path
7-
from typing import Dict, List, Literal, Optional, Union
7+
from typing import Literal, Optional, Union
88

99
import yaml
1010
from backports.entry_points_selectable import entry_points
1111
from pydantic import BaseModel, BaseSettings, Extra, validator
1212

1313

14-
class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
15-
acquisition_software: List[str]
16-
calibrations: Dict[str, Dict[str, Union[dict, float]]]
17-
data_directories: List[Path]
18-
rsync_basepath: Path
19-
default_model: Path
14+
class MachineConfig(BaseModel): # type: ignore
15+
"""
16+
Keys that describe the type of workflow conducted on the client side, and how
17+
Murfey will handle its data transfer and processing
18+
"""
19+
20+
# General info --------------------------------------------------------------------
2021
display_name: str = ""
2122
instrument_name: str = ""
2223
image_path: Optional[Path] = None
23-
software_versions: Dict[str, str] = {}
24-
external_executables: Dict[str, str] = {}
25-
external_executables_eer: Dict[str, str] = {}
26-
external_environment: Dict[str, str] = {}
27-
rsync_module: str = ""
24+
machine_override: str = ""
25+
26+
# Hardware and software -----------------------------------------------------------
27+
camera: str = "FALCON"
28+
superres: bool = False
29+
calibrations: dict[str, dict[str, Union[dict, float]]]
30+
acquisition_software: list[str]
31+
software_versions: dict[str, str] = {}
32+
software_settings_output_directories: dict[str, list[str]] = {}
33+
data_required_substrings: dict[str, dict[str, list[str]]] = {}
34+
35+
# Client side directory setup -----------------------------------------------------
36+
data_directories: list[Path]
2837
create_directories: list[str] = ["atlas"]
29-
analyse_created_directories: List[str] = []
38+
analyse_created_directories: list[str] = []
3039
gain_reference_directory: Optional[Path] = None
3140
eer_fractionation_file_template: str = ""
32-
processed_directory_name: str = "processed"
33-
gain_directory_name: str = "processing"
34-
node_creator_queue: str = "node_creator"
35-
superres: bool = False
36-
camera: str = "FALCON"
37-
data_required_substrings: Dict[str, Dict[str, List[str]]] = {}
38-
allow_removal: bool = False
41+
42+
# Data transfer setup -------------------------------------------------------------
43+
# Rsync setup
3944
data_transfer_enabled: bool = True
45+
rsync_url: str = ""
46+
rsync_module: str = ""
47+
rsync_basepath: Path
48+
allow_removal: bool = False
49+
50+
# Upstream data download setup
51+
upstream_data_directories: list[Path] = [] # Previous sessions
52+
upstream_data_download_directory: Optional[Path] = None # Set by microscope config
53+
upstream_data_tiff_locations: list[str] = ["processed"] # Location of CLEM TIFFs
54+
55+
# Data processing setup -----------------------------------------------------------
56+
# General processing setup
4057
processing_enabled: bool = True
41-
machine_override: str = ""
42-
processed_extra_directory: str = ""
43-
plugin_packages: Dict[str, Path] = {}
44-
software_settings_output_directories: Dict[str, List[str]] = {}
4558
process_by_default: bool = True
46-
recipes: Dict[str, str] = {
59+
gain_directory_name: str = "processing"
60+
processed_directory_name: str = "processed"
61+
processed_extra_directory: str = ""
62+
recipes: dict[str, str] = {
4763
"em-spa-bfactor": "em-spa-bfactor",
4864
"em-spa-class2d": "em-spa-class2d",
4965
"em-spa-class3d": "em-spa-class3d",
@@ -53,26 +69,41 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
5369
"em-tomo-align": "em-tomo-align",
5470
}
5571

56-
# Find and download upstream directories
57-
upstream_data_directories: List[Path] = [] # Previous sessions
58-
upstream_data_download_directory: Optional[Path] = None # Set by microscope config
59-
upstream_data_tiff_locations: List[str] = ["processed"] # Location of CLEM TIFFs
60-
72+
# Particle picking setup
73+
default_model: Path
6174
model_search_directory: str = "processing"
6275
initial_model_search_directory: str = "processing/initial_model"
6376

64-
failure_queue: str = ""
65-
instrument_server_url: str = "http://localhost:8001"
66-
frontend_url: str = "http://localhost:3000"
67-
murfey_url: str = "http://localhost:8000"
68-
rsync_url: str = ""
77+
# Data analysis plugins
78+
external_executables: dict[str, str] = {}
79+
external_executables_eer: dict[str, str] = {}
80+
external_environment: dict[str, str] = {}
81+
plugin_packages: dict[str, Path] = {}
6982

83+
# Server and network setup --------------------------------------------------------
84+
# Configurations and URLs
7085
security_configuration_path: Optional[Path] = None
86+
murfey_url: str = "http://localhost:8000"
87+
frontend_url: str = "http://localhost:3000"
88+
instrument_server_url: str = "http://localhost:8001"
7189

90+
# Messaging queues
91+
failure_queue: str = ""
92+
node_creator_queue: str = "node_creator"
7293
notifications_queue: str = "pato_notification"
7394

95+
class Config:
96+
"""
97+
Inner class that defines this model's parsing and serialising behaviour
98+
"""
7499

75-
def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]:
100+
extra = Extra.allow
101+
json_encoders = {
102+
Path: str,
103+
}
104+
105+
106+
def from_file(config_file_path: Path, instrument: str = "") -> dict[str, MachineConfig]:
76107
with open(config_file_path, "r") as config_stream:
77108
config = yaml.safe_load(config_stream)
78109
return {
@@ -83,22 +114,36 @@ def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, Machine
83114

84115

85116
class Security(BaseModel):
117+
# Murfey database settings
86118
murfey_db_credentials: Path
87119
crypto_key: str
88-
auth_key: str = ""
120+
sqlalchemy_pooling: bool = True
121+
122+
# ISPyB settings
123+
ispyb_credentials: Optional[Path] = None
124+
125+
# Murfey server connection settings
89126
auth_algorithm: str = ""
127+
auth_key: str = ""
128+
auth_type: Literal["password", "cookie"] = "password"
90129
auth_url: str = ""
91-
sqlalchemy_pooling: bool = True
92-
allow_origins: List[str] = ["*"]
130+
cookie_key: str = ""
93131
session_validation: str = ""
94132
session_token_timeout: Optional[int] = None
95-
auth_type: Literal["password", "cookie"] = "password"
96-
cookie_key: str = ""
133+
allow_origins: list[str] = ["*"]
134+
135+
# RabbitMQ settings
97136
rabbitmq_credentials: Path
98137
feedback_queue: str = "murfey_feedback"
138+
139+
# Graylog settings
99140
graylog_host: str = ""
100141
graylog_port: Optional[int] = None
101-
ispyb_credentials: Optional[Path] = None
142+
143+
class Config:
144+
json_encoders = {
145+
Path: str,
146+
}
102147

103148
@validator("graylog_port")
104149
def check_port_present_if_host_is(
@@ -158,7 +203,7 @@ def get_security_config() -> Security:
158203

159204

160205
@lru_cache(maxsize=1)
161-
def get_machine_config(instrument_name: str = "") -> Dict[str, MachineConfig]:
206+
def get_machine_config(instrument_name: str = "") -> dict[str, MachineConfig]:
162207
machine_config = {
163208
"": MachineConfig(
164209
acquisition_software=[],

src/murfey/util/instrument_models.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,9 @@
44

55
from pydantic import BaseModel
66

7-
from murfey.util.config import MachineConfig
8-
97

108
class MultigridWatcherSpec(BaseModel):
119
source: Path
12-
configuration: MachineConfig
1310
label: str
1411
visit: str
1512
instrument_name: str

src/murfey/workflows/clem/process_raw_lifs.py

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
The recipe referred to here is stored on GitLab.
44
"""
55

6+
from logging import getLogger
67
from pathlib import Path
78
from typing import Optional
89

@@ -11,6 +12,8 @@
1112
except AttributeError:
1213
pass # Ignore if ISPyB credentials environment variable not set
1314

15+
logger = getLogger("murfey.workflows.clem.process_raw_lifs")
16+
1417

1518
def zocalo_cluster_request(
1619
file: Path,
@@ -43,24 +46,28 @@ def zocalo_cluster_request(
4346
# Load machine config to get the feedback queue
4447
feedback_queue: str = messenger.feedback_queue
4548

46-
# Send the message
47-
# The keys under "parameters" will populate all the matching fields in {}
48-
# in the processing recipe
49-
messenger.send(
50-
"processing_recipe",
51-
{
52-
"recipes": ["clem-lif-to-stack"],
53-
"parameters": {
54-
# Job parameters
55-
"lif_file": f"{str(file)}",
56-
"root_folder": root_folder,
57-
# Other recipe parameters
58-
"session_dir": f"{str(session_dir)}",
59-
"session_id": session_id,
60-
"job_name": job_name,
61-
"feedback_queue": feedback_queue,
62-
},
49+
# Construct recipe and submit it for processing
50+
recipe = {
51+
"recipes": ["clem-lif-to-stack"],
52+
"parameters": {
53+
# Job parameters
54+
"lif_file": f"{str(file)}",
55+
"root_folder": root_folder,
56+
# Other recipe parameters
57+
"session_dir": f"{str(session_dir)}",
58+
"session_id": session_id,
59+
"job_name": job_name,
60+
"feedback_queue": feedback_queue,
6361
},
62+
}
63+
logger.debug(
64+
f"Submitting LIF processing request to {messenger.feedback_queue!r} "
65+
"with the following recipe: \n"
66+
f"{recipe}"
67+
)
68+
messenger.send(
69+
queue="processing_recipe",
70+
message=recipe,
6471
new_connection=True,
6572
)
6673
else:

0 commit comments

Comments
 (0)