Skip to content

Commit c9721df

Browse files
committed
Move the spa flush to a workflow
1 parent 002a68b commit c9721df

File tree

5 files changed

+395
-397
lines changed

5 files changed

+395
-397
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ murfey = "murfey.client:run"
107107
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
108108
"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
109109
"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
110+
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"
110111

111112
[tool.setuptools]
112113
package-dir = {"" = "src"}

src/murfey/server/__init__.py

Lines changed: 1 addition & 233 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from importlib.resources import files
1212
from pathlib import Path
1313
from threading import Thread
14-
from typing import Any, Dict, List, Literal, NamedTuple, Optional, Tuple
14+
from typing import Any, Dict, List, Literal, NamedTuple, Tuple
1515

1616
import graypy
1717
import mrcfile
@@ -49,7 +49,6 @@
4949
import murfey.server.websocket
5050
import murfey.util.db as db
5151
from murfey.server.murfey_db import url # murfey_db
52-
from murfey.server.spa.spa_metadata import register_foil_hole, register_grid_square
5352
from murfey.util import LogFilter
5453
from murfey.util.config import (
5554
MachineConfig,
@@ -58,16 +57,7 @@
5857
get_microscope,
5958
get_security_config,
6059
)
61-
from murfey.util.models import FoilHoleParameters, GridSquareParameters
6260
from murfey.util.processing_params import default_spa_parameters
63-
from murfey.util.spa_metadata import (
64-
GridSquare,
65-
_foil_hole_data,
66-
_foil_hole_from_file,
67-
_get_grid_square_atlas_positions,
68-
_grid_square_data,
69-
_grid_square_from_file,
70-
)
7161
from murfey.util.state import global_state
7262
from murfey.util.tomo import midpoint
7363

@@ -1940,223 +1930,6 @@ def _register_initial_model(message: dict, _db=murfey_db, demo: bool = False):
19401930
_db.close()
19411931

19421932

1943-
def _grid_square_metadata_file(f: Path, grid_square: int) -> Optional[Path]:
1944-
"""Search through metadata directories to find the required grid square dm"""
1945-
raw_dir = f.parent.parent.parent
1946-
metadata_dirs = raw_dir.glob("metadata*")
1947-
for md_dir in metadata_dirs:
1948-
gs_path = md_dir / f"Metadata/GridSquare_{grid_square}.dm"
1949-
if gs_path.is_file():
1950-
return gs_path
1951-
logger.error(f"Could not determine grid square metadata path for {f}")
1952-
return None
1953-
1954-
1955-
def _flush_position_analysis(
1956-
movie_path: Path, dcg_id: int, session_id: int
1957-
) -> Optional[int]:
1958-
"""Register a grid square and foil hole in the database"""
1959-
data_collection_group = murfey_db.exec(
1960-
select(db.DataCollectionGroup).where(db.DataCollectionGroup.id == dcg_id)
1961-
).one()
1962-
1963-
# Work out the grid square and associated metadata file
1964-
grid_square = _grid_square_from_file(movie_path)
1965-
grid_square_metadata_file = _grid_square_metadata_file(movie_path, grid_square)
1966-
if grid_square_metadata_file:
1967-
gs = _grid_square_data(grid_square_metadata_file, grid_square)
1968-
else:
1969-
gs = GridSquare(id=grid_square)
1970-
if data_collection_group.atlas:
1971-
# If an atlas if present, work out where this grid square is on it
1972-
gs_pix_position = _get_grid_square_atlas_positions(
1973-
data_collection_group.atlas,
1974-
grid_square=str(grid_square),
1975-
)[str(grid_square)]
1976-
grid_square_parameters = GridSquareParameters(
1977-
tag=data_collection_group.tag,
1978-
x_location=gs_pix_position[0],
1979-
y_location=gs_pix_position[1],
1980-
x_stage_position=gs_pix_position[2],
1981-
y_stage_position=gs_pix_position[3],
1982-
readout_area_x=gs.readout_area_x,
1983-
readout_area_y=gs.readout_area_y,
1984-
thumbnail_size_x=gs.thumbnail_size_x,
1985-
thumbnail_size_y=gs.thumbnail_size_y,
1986-
height=gs_pix_position[5],
1987-
width=gs_pix_position[4],
1988-
pixel_size=gs.pixel_size,
1989-
image=gs.image,
1990-
angle=gs_pix_position[6],
1991-
)
1992-
else:
1993-
# Skip location analysis if no atlas
1994-
grid_square_parameters = GridSquareParameters(
1995-
tag=data_collection_group.tag,
1996-
readout_area_x=gs.readout_area_x,
1997-
readout_area_y=gs.readout_area_y,
1998-
thumbnail_size_x=gs.thumbnail_size_x,
1999-
thumbnail_size_y=gs.thumbnail_size_y,
2000-
pixel_size=gs.pixel_size,
2001-
image=gs.image,
2002-
)
2003-
# Insert or update this grid square in the database
2004-
register_grid_square(session_id, gs.id, grid_square_parameters, murfey_db)
2005-
2006-
# Find the foil hole info and register it
2007-
foil_hole = _foil_hole_from_file(movie_path)
2008-
if grid_square_metadata_file:
2009-
fh = _foil_hole_data(
2010-
grid_square_metadata_file,
2011-
foil_hole,
2012-
grid_square,
2013-
)
2014-
foil_hole_parameters = FoilHoleParameters(
2015-
tag=data_collection_group.tag,
2016-
name=foil_hole,
2017-
x_location=fh.x_location,
2018-
y_location=fh.y_location,
2019-
x_stage_position=fh.x_stage_position,
2020-
y_stage_position=fh.y_stage_position,
2021-
readout_area_x=fh.readout_area_x,
2022-
readout_area_y=fh.readout_area_y,
2023-
thumbnail_size_x=fh.thumbnail_size_x,
2024-
thumbnail_size_y=fh.thumbnail_size_y,
2025-
pixel_size=fh.pixel_size,
2026-
image=fh.image,
2027-
diameter=fh.diameter,
2028-
)
2029-
else:
2030-
foil_hole_parameters = FoilHoleParameters(
2031-
tag=data_collection_group.tag,
2032-
name=foil_hole,
2033-
)
2034-
# Insert or update this foil hole in the database
2035-
register_foil_hole(session_id, gs.id, foil_hole_parameters, murfey_db)
2036-
return foil_hole
2037-
2038-
2039-
def _flush_spa_preprocessing(message: dict):
2040-
session_id = message["session_id"]
2041-
stashed_files = murfey_db.exec(
2042-
select(db.PreprocessStash)
2043-
.where(db.PreprocessStash.session_id == session_id)
2044-
.where(db.PreprocessStash.tag == message["tag"])
2045-
).all()
2046-
if not stashed_files:
2047-
return None
2048-
instrument_name = (
2049-
murfey_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
2050-
.one()
2051-
.instrument_name
2052-
)
2053-
machine_config = get_machine_config(instrument_name=instrument_name)[
2054-
instrument_name
2055-
]
2056-
recipe_name = machine_config.recipes.get("em-spa-preprocess", "em-spa-preprocess")
2057-
collected_ids = murfey_db.exec(
2058-
select(
2059-
db.DataCollectionGroup,
2060-
db.DataCollection,
2061-
db.ProcessingJob,
2062-
db.AutoProcProgram,
2063-
)
2064-
.where(db.DataCollectionGroup.session_id == session_id)
2065-
.where(db.DataCollectionGroup.tag == message["tag"])
2066-
.where(db.DataCollection.dcg_id == db.DataCollectionGroup.id)
2067-
.where(db.ProcessingJob.dc_id == db.DataCollection.id)
2068-
.where(db.AutoProcProgram.pj_id == db.ProcessingJob.id)
2069-
.where(db.ProcessingJob.recipe == recipe_name)
2070-
).one()
2071-
params = murfey_db.exec(
2072-
select(db.SPARelionParameters, db.SPAFeedbackParameters)
2073-
.where(db.SPARelionParameters.pj_id == collected_ids[2].id)
2074-
.where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id)
2075-
).one()
2076-
proc_params = params[0]
2077-
feedback_params = params[1]
2078-
if not proc_params:
2079-
logger.warning(
2080-
f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}"
2081-
)
2082-
raise ValueError(
2083-
"No processing parameters were found in the database when flushing SPA preprocessing"
2084-
)
2085-
2086-
murfey_ids = _murfey_id(
2087-
collected_ids[3].id,
2088-
murfey_db,
2089-
number=2 * len(stashed_files),
2090-
close=False,
2091-
)
2092-
if feedback_params.picker_murfey_id is None:
2093-
feedback_params.picker_murfey_id = murfey_ids[1]
2094-
murfey_db.add(feedback_params)
2095-
2096-
for i, f in enumerate(stashed_files):
2097-
if f.foil_hole_id:
2098-
foil_hole_id = f.foil_hole_id
2099-
else:
2100-
# Register grid square and foil hole if not present
2101-
foil_hole_id = _flush_position_analysis(
2102-
movie_path=f.file_path,
2103-
dcg_id=collected_ids[0].id,
2104-
session_id=session_id,
2105-
)
2106-
2107-
mrcp = Path(f.mrc_out)
2108-
ppath = Path(f.file_path)
2109-
if not mrcp.parent.exists():
2110-
mrcp.parent.mkdir(parents=True)
2111-
movie = db.Movie(
2112-
murfey_id=murfey_ids[2 * i],
2113-
path=f.file_path,
2114-
image_number=f.image_number,
2115-
tag=f.tag,
2116-
foil_hole_id=foil_hole_id,
2117-
)
2118-
murfey_db.add(movie)
2119-
zocalo_message: dict = {
2120-
"recipes": [recipe_name],
2121-
"parameters": {
2122-
"node_creator_queue": machine_config.node_creator_queue,
2123-
"dcid": collected_ids[1].id,
2124-
"kv": proc_params.voltage,
2125-
"autoproc_program_id": collected_ids[3].id,
2126-
"movie": f.file_path,
2127-
"mrc_out": f.mrc_out,
2128-
"pixel_size": proc_params.angpix,
2129-
"image_number": f.image_number,
2130-
"microscope": get_microscope(),
2131-
"mc_uuid": murfey_ids[2 * i],
2132-
"ft_bin": proc_params.motion_corr_binning,
2133-
"fm_dose": proc_params.dose_per_frame,
2134-
"gain_ref": proc_params.gain_ref,
2135-
"picker_uuid": murfey_ids[2 * i + 1],
2136-
"session_id": session_id,
2137-
"particle_diameter": proc_params.particle_diameter or 0,
2138-
"fm_int_file": f.eer_fractionation_file,
2139-
"do_icebreaker_jobs": default_spa_parameters.do_icebreaker_jobs,
2140-
"foil_hole_id": foil_hole_id,
2141-
},
2142-
}
2143-
if _transport_object:
2144-
zocalo_message["parameters"][
2145-
"feedback_queue"
2146-
] = _transport_object.feedback_queue
2147-
_transport_object.send(
2148-
"processing_recipe", zocalo_message, new_connection=True
2149-
)
2150-
murfey_db.delete(f)
2151-
else:
2152-
logger.error(
2153-
f"Pre-processing was requested for {ppath.name} but no Zocalo transport object was found"
2154-
)
2155-
murfey_db.commit()
2156-
murfey_db.close()
2157-
return None
2158-
2159-
21601933
def _flush_tomography_preprocessing(message: dict):
21611934
session_id = message["session_id"]
21621935
instrument_name = (
@@ -2956,11 +2729,6 @@ def feedback_callback(header: dict, message: dict) -> None:
29562729
if _transport_object:
29572730
_transport_object.transport.ack(header)
29582731
return None
2959-
elif message["register"] == "flush_spa_preprocess":
2960-
_flush_spa_preprocessing(message)
2961-
if _transport_object:
2962-
_transport_object.transport.ack(header)
2963-
return None
29642732
elif message["register"] == "spa_processing_parameters":
29652733
session_id = message["session_id"]
29662734
collected_ids = murfey_db.exec(

0 commit comments

Comments
 (0)