Skip to content

Commit 27708e2

Browse files
authored
Run preprocessing flushes in separate threads (#284)
1 parent 8894976 commit 27708e2

File tree

2 files changed

+148
-108
lines changed

2 files changed

+148
-108
lines changed

src/murfey/server/__init__.py

Lines changed: 147 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import os
66
import time
77
from datetime import datetime
8-
from functools import partial, singledispatch
8+
from functools import partial, singledispatch, wraps
99
from pathlib import Path
1010
from threading import Thread
11-
from typing import Any, Dict, List, NamedTuple, Tuple
11+
from typing import Any, Callable, Dict, List, NamedTuple, Tuple
1212

1313
import numpy as np
1414
import uvicorn
@@ -85,6 +85,28 @@ class JobIDs(NamedTuple):
8585
client_id: int
8686

8787

88+
def record_failure(
89+
f: Callable, record_queue: str = "", is_callback: bool = True
90+
) -> Callable:
91+
@wraps(f)
92+
def wrapper(*args, **kwargs):
93+
try:
94+
return f(*args, **kwargs)
95+
except Exception:
96+
logger.warning(f"Call to {f} failed", exc_info=True)
97+
if _transport_object and is_callback:
98+
if not record_queue:
99+
machine_config = get_machine_config()
100+
record_queue = (
101+
machine_config.failure_queue
102+
or f"dlq.{machine_config.feedback_queue}"
103+
)
104+
_transport_object.send(record_queue, args[0], new_connection=True)
105+
return None
106+
107+
return wrapper
108+
109+
88110
def sanitise(in_string: str) -> str:
89111
return in_string.replace("\r\n", "").replace("\n", "")
90112

@@ -1672,6 +1694,116 @@ def _register_initial_model(message: dict, _db=murfey_db, demo: bool = False):
16721694
_db.close()
16731695

16741696

1697+
@record_failure
1698+
def _flush_spa_preprocessing(message: dict):
1699+
session_id = (
1700+
murfey_db.exec(
1701+
select(db.ClientEnvironment).where(
1702+
db.ClientEnvironment.client_id == message["client_id"]
1703+
)
1704+
)
1705+
.one()
1706+
.session_id
1707+
)
1708+
stashed_files = murfey_db.exec(
1709+
select(db.PreprocessStash)
1710+
.where(db.PreprocessStash.session_id == session_id)
1711+
.where(db.PreprocessStash.tag == message["tag"])
1712+
).all()
1713+
if not stashed_files:
1714+
return None
1715+
machine_config = get_machine_config()
1716+
collected_ids = murfey_db.exec(
1717+
select(
1718+
db.DataCollectionGroup,
1719+
db.DataCollection,
1720+
db.ProcessingJob,
1721+
db.AutoProcProgram,
1722+
)
1723+
.where(db.DataCollectionGroup.session_id == session_id)
1724+
.where(db.DataCollectionGroup.tag == message["tag"])
1725+
.where(db.DataCollection.dcg_id == db.DataCollectionGroup.id)
1726+
.where(db.ProcessingJob.dc_id == db.DataCollection.id)
1727+
.where(db.AutoProcProgram.pj_id == db.ProcessingJob.id)
1728+
.where(db.ProcessingJob.recipe == "em-spa-preprocess")
1729+
).one()
1730+
params = murfey_db.exec(
1731+
select(db.SPARelionParameters, db.SPAFeedbackParameters)
1732+
.where(db.SPARelionParameters.pj_id == collected_ids[2].id)
1733+
.where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id)
1734+
).one()
1735+
proc_params = params[0]
1736+
feedback_params = params[1]
1737+
if not proc_params:
1738+
logger.warning(
1739+
f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}"
1740+
)
1741+
raise ValueError(
1742+
"No processing parameters were foudn in the database when flushing SPA preprocessing"
1743+
)
1744+
1745+
murfey_ids = _murfey_id(
1746+
collected_ids[3].id,
1747+
murfey_db,
1748+
number=2 * len(stashed_files),
1749+
close=False,
1750+
)
1751+
if feedback_params.picker_murfey_id is None:
1752+
feedback_params.picker_murfey_id = murfey_ids[1]
1753+
murfey_db.add(feedback_params)
1754+
1755+
for i, f in enumerate(stashed_files):
1756+
mrcp = Path(f.mrc_out)
1757+
ppath = Path(f.file_path)
1758+
if not mrcp.parent.exists():
1759+
mrcp.parent.mkdir(parents=True)
1760+
movie = db.Movie(
1761+
murfey_id=murfey_ids[2 * i],
1762+
path=f.file_path,
1763+
image_number=f.image_number,
1764+
tag=f.tag,
1765+
foil_hole_id=f.foil_hole_id,
1766+
)
1767+
murfey_db.add(movie)
1768+
zocalo_message = {
1769+
"recipes": ["em-spa-preprocess"],
1770+
"parameters": {
1771+
"feedback_queue": machine_config.feedback_queue,
1772+
"node_creator_queue": machine_config.node_creator_queue,
1773+
"dcid": collected_ids[1].id,
1774+
"kv": proc_params.voltage,
1775+
"autoproc_program_id": collected_ids[3].id,
1776+
"movie": f.file_path,
1777+
"mrc_out": f.mrc_out,
1778+
"pix_size": proc_params.angpix,
1779+
"image_number": f.image_number,
1780+
"microscope": get_microscope(),
1781+
"mc_uuid": murfey_ids[2 * i],
1782+
"ft_bin": proc_params.motion_corr_binning,
1783+
"fm_dose": proc_params.dose_per_frame,
1784+
"gain_ref": proc_params.gain_ref,
1785+
"picker_uuid": murfey_ids[2 * i + 1],
1786+
"session_id": session_id,
1787+
"particle_diameter": proc_params.particle_diameter or 0,
1788+
"fm_int_file": f.eer_fractionation_file,
1789+
"do_icebreaker_jobs": default_spa_parameters.do_icebreaker_jobs,
1790+
},
1791+
}
1792+
if _transport_object:
1793+
_transport_object.send(
1794+
"processing_recipe", zocalo_message, new_connection=True
1795+
)
1796+
murfey_db.delete(f)
1797+
else:
1798+
logger.error(
1799+
f"Pre-processing was requested for {ppath.name} but no Zocalo transport object was found"
1800+
)
1801+
murfey_db.commit()
1802+
murfey_db.close()
1803+
return None
1804+
1805+
1806+
@record_failure
16751807
def _flush_tomography_preprocessing(message: dict):
16761808
machine_config = get_machine_config()
16771809
session_id = (
@@ -2363,117 +2495,24 @@ def feedback_callback(header: dict, message: dict) -> None:
23632495
_transport_object.transport.ack(header)
23642496
return None
23652497
elif message["register"] == "flush_tomography_preprocess":
2366-
_flush_tomography_preprocessing(message)
2498+
thread = Thread(
2499+
target=_flush_tomography_preprocessing,
2500+
args=message,
2501+
name="tomography_flush",
2502+
daemon=True,
2503+
)
2504+
thread.start()
23672505
if _transport_object:
23682506
_transport_object.transport.ack(header)
23692507
return None
23702508
elif message["register"] == "flush_spa_preprocess":
2371-
session_id = (
2372-
murfey_db.exec(
2373-
select(db.ClientEnvironment).where(
2374-
db.ClientEnvironment.client_id == message["client_id"]
2375-
)
2376-
)
2377-
.one()
2378-
.session_id
2509+
thread = Thread(
2510+
target=_flush_spa_preprocessing,
2511+
args=message,
2512+
name="spa_flush",
2513+
daemon=True,
23792514
)
2380-
stashed_files = murfey_db.exec(
2381-
select(db.PreprocessStash)
2382-
.where(db.PreprocessStash.session_id == session_id)
2383-
.where(db.PreprocessStash.tag == message["tag"])
2384-
).all()
2385-
if not stashed_files:
2386-
if _transport_object:
2387-
_transport_object.transport.ack(header)
2388-
return None
2389-
machine_config = get_machine_config()
2390-
collected_ids = murfey_db.exec(
2391-
select(
2392-
db.DataCollectionGroup,
2393-
db.DataCollection,
2394-
db.ProcessingJob,
2395-
db.AutoProcProgram,
2396-
)
2397-
.where(db.DataCollectionGroup.session_id == session_id)
2398-
.where(db.DataCollectionGroup.tag == message["tag"])
2399-
.where(db.DataCollection.dcg_id == db.DataCollectionGroup.id)
2400-
.where(db.ProcessingJob.dc_id == db.DataCollection.id)
2401-
.where(db.AutoProcProgram.pj_id == db.ProcessingJob.id)
2402-
.where(db.ProcessingJob.recipe == "em-spa-preprocess")
2403-
).one()
2404-
params = murfey_db.exec(
2405-
select(db.SPARelionParameters, db.SPAFeedbackParameters)
2406-
.where(db.SPARelionParameters.pj_id == collected_ids[2].id)
2407-
.where(db.SPAFeedbackParameters.pj_id == db.SPARelionParameters.pj_id)
2408-
).one()
2409-
proc_params = params[0]
2410-
feedback_params = params[1]
2411-
if not proc_params:
2412-
logger.warning(
2413-
f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}"
2414-
)
2415-
if _transport_object:
2416-
_transport_object.transport.nack(header)
2417-
return None
2418-
2419-
murfey_ids = _murfey_id(
2420-
collected_ids[3].id,
2421-
murfey_db,
2422-
number=2 * len(stashed_files),
2423-
close=False,
2424-
)
2425-
if feedback_params.picker_murfey_id is None:
2426-
feedback_params.picker_murfey_id = murfey_ids[1]
2427-
murfey_db.add(feedback_params)
2428-
2429-
for i, f in enumerate(stashed_files):
2430-
mrcp = Path(f.mrc_out)
2431-
ppath = Path(f.file_path)
2432-
if not mrcp.parent.exists():
2433-
mrcp.parent.mkdir(parents=True)
2434-
movie = db.Movie(
2435-
murfey_id=murfey_ids[2 * i],
2436-
path=f.file_path,
2437-
image_number=f.image_number,
2438-
tag=f.tag,
2439-
foil_hole_id=f.foil_hole_id,
2440-
)
2441-
murfey_db.add(movie)
2442-
zocalo_message = {
2443-
"recipes": ["em-spa-preprocess"],
2444-
"parameters": {
2445-
"feedback_queue": machine_config.feedback_queue,
2446-
"node_creator_queue": machine_config.node_creator_queue,
2447-
"dcid": collected_ids[1].id,
2448-
"kv": proc_params.voltage,
2449-
"autoproc_program_id": collected_ids[3].id,
2450-
"movie": f.file_path,
2451-
"mrc_out": f.mrc_out,
2452-
"pix_size": proc_params.angpix,
2453-
"image_number": f.image_number,
2454-
"microscope": get_microscope(),
2455-
"mc_uuid": murfey_ids[2 * i],
2456-
"ft_bin": proc_params.motion_corr_binning,
2457-
"fm_dose": proc_params.dose_per_frame,
2458-
"gain_ref": proc_params.gain_ref,
2459-
"picker_uuid": murfey_ids[2 * i + 1],
2460-
"session_id": session_id,
2461-
"particle_diameter": proc_params.particle_diameter or 0,
2462-
"fm_int_file": f.eer_fractionation_file,
2463-
"do_icebreaker_jobs": default_spa_parameters.do_icebreaker_jobs,
2464-
},
2465-
}
2466-
if _transport_object:
2467-
_transport_object.send(
2468-
"processing_recipe", zocalo_message, new_connection=True
2469-
)
2470-
murfey_db.delete(f)
2471-
else:
2472-
logger.error(
2473-
f"Pre-processing was requested for {ppath.name} but no Zocalo transport object was found"
2474-
)
2475-
murfey_db.commit()
2476-
murfey_db.close()
2515+
thread.start()
24772516
if _transport_object:
24782517
_transport_object.transport.ack(header)
24792518
return None

src/murfey/server/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class MachineConfig(BaseModel):
4444
upstream_data_directories: List[Path] = []
4545
upstream_data_download_directory: Optional[Path] = None
4646
upstream_data_tiff_locations: List[str] = ["processed"]
47+
failure_queue: str = ""
4748

4849

4950
def from_file(config_file_path: Path, microscope: str) -> MachineConfig:

0 commit comments

Comments
 (0)