Skip to content

Commit 282795d

Browse files
committed
Tomo picking feedback workflow
1 parent 5b71bd8 commit 282795d

File tree

2 files changed

+181
-0
lines changed

2 files changed

+181
-0
lines changed

src/murfey/util/processing_params.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,11 @@ class SPAParameters(BaseModel):
7979

8080

8181
default_spa_parameters = SPAParameters()
82+
83+
84+
class TomographyParameters(BaseModel):
85+
batch_size_2d: int = 50000
86+
nr_picks_before_diameter: int = 10000
87+
88+
89+
default_tomo_parameters = TomographyParameters()
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
from logging import getLogger
2+
3+
import numpy as np
4+
from sqlalchemy import func
5+
from sqlmodel import Session, select
6+
7+
from murfey.server import _transport_object
8+
from murfey.server.feedback import _app_id, _murfey_id
9+
from murfey.util.config import get_machine_config
10+
from murfey.util.db import AutoProcProgram, DataCollection, ParticleSizes, ProcessingJob
11+
from murfey.util.db import Session as MurfeySession
12+
from murfey.util.db import TomogramPicks, TomographyProcessingParameters
13+
from murfey.util.processing_params import default_tomo_parameters
14+
15+
logger = getLogger("murfey.workflows.tomo.feedback")
16+
17+
18+
def _pj_id_tomo_classification(app_id: int, recipe: str, _db) -> int:
19+
dcg_id = (
20+
_db.exec(
21+
select(AutoProcProgram, ProcessingJob, DataCollection)
22+
.where(AutoProcProgram.id == app_id)
23+
.where(AutoProcProgram.pj_id == ProcessingJob.id)
24+
.where(ProcessingJob.dc_id == DataCollection.id)
25+
)
26+
.one()[2]
27+
.dcg_id
28+
)
29+
pj_id = (
30+
_db.exec(
31+
select(ProcessingJob, DataCollection)
32+
.where(DataCollection.dcg_id == dcg_id)
33+
.where(ProcessingJob.dc_id == DataCollection.id)
34+
.where(ProcessingJob.recipe == recipe)
35+
)
36+
.one()[0]
37+
.id
38+
)
39+
return pj_id
40+
41+
42+
def _register_picked_tomogram_use_diameter(message: dict, _db: Session):
43+
"""Received picked particles from the tomogram autopick service"""
44+
# Add this message to the table of seen messages
45+
pj_id = _pj_id_tomo_classification(message["program_id"], "em-tomo-class2d", _db)
46+
47+
pick_params = TomogramPicks(
48+
pj_id=pj_id,
49+
tomogram=message["tomogram"],
50+
cbox_3d=message["cbox_3d"],
51+
particle_count=message["particle_count"],
52+
tomogram_pixel_size=message["pixel_size"],
53+
)
54+
_db.add(pick_params)
55+
_db.commit()
56+
_db.close()
57+
58+
picking_db_len = _db.exec(
59+
select(func.count(ParticleSizes.id)).where(ParticleSizes.pj_id == pj_id)
60+
).one()
61+
if picking_db_len > default_tomo_parameters.nr_picks_before_diameter:
62+
# If there are enough particles to get a diameter
63+
instrument_name = (
64+
_db.exec(
65+
select(MurfeySession).where(MurfeySession.id == message["session_id"])
66+
)
67+
.one()
68+
.instrument_name
69+
)
70+
machine_config = get_machine_config(instrument_name=instrument_name)[
71+
instrument_name
72+
]
73+
tomo_params = _db.exec(
74+
select(TomographyProcessingParameters).where(
75+
TomographyProcessingParameters.pj_id == pj_id
76+
)
77+
).one()
78+
79+
particle_diameter = tomo_params.particle_diameter
80+
81+
if not particle_diameter:
82+
# If the diameter has not been calculated then find it
83+
picking_db = _db.exec(
84+
select(ParticleSizes.particle_size).where(ParticleSizes.pj_id == pj_id)
85+
).all()
86+
particle_diameter = np.quantile(list(picking_db), 0.75)
87+
tomo_params.particle_diameter = particle_diameter
88+
_db.add(tomo_params)
89+
_db.commit()
90+
91+
tomo_pick_db = _db.exec(
92+
select(TomogramPicks).where(TomogramPicks.pj_id == pj_id)
93+
).all()
94+
for saved_message in tomo_pick_db:
95+
# Send on all saved messages to extraction
96+
class_uuids = {
97+
str(i + 1): m
98+
for i, m in enumerate(
99+
_murfey_id(_app_id(pj_id, _db), _db, number=50)
100+
)
101+
}
102+
class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0]
103+
_db.expunge(saved_message)
104+
zocalo_message: dict = {
105+
"parameters": {
106+
"tomogram": saved_message.tomogram,
107+
"cbox_3d": saved_message.cbox_3d,
108+
"pixel_size": saved_message.tomogram_pixel_size,
109+
"particle_diameter": particle_diameter,
110+
"kv": tomo_params.voltage,
111+
"node_creator_queue": machine_config.node_creator_queue,
112+
"session_id": message["session_id"],
113+
"autoproc_program_id": _app_id(pj_id, _db),
114+
"batch_size": default_tomo_parameters.batch_size_2d,
115+
"picker_id": None,
116+
"class2d_grp_uuid": class2d_grp_uuid,
117+
"class_uuids": class_uuids,
118+
},
119+
"recipes": ["em-spa-extract"],
120+
}
121+
if _transport_object:
122+
zocalo_message["parameters"][
123+
"feedback_queue"
124+
] = _transport_object.feedback_queue
125+
_transport_object.send(
126+
"processing_recipe", zocalo_message, new_connection=True
127+
)
128+
else:
129+
# If the diameter is known then just send the new message
130+
particle_diameter = tomo_params.particle_diameter
131+
class_uuids = {
132+
str(i + 1): m
133+
for i, m in enumerate(_murfey_id(_app_id(pj_id, _db), _db, number=50))
134+
}
135+
class2d_grp_uuid = _murfey_id(_app_id(pj_id, _db), _db)[0]
136+
zocalo_message = {
137+
"parameters": {
138+
"tomogram": message["tomogram"],
139+
"cbox_3d": message["cbox_3d"],
140+
"pixel_size": message["pixel_size"],
141+
"particle_diameter": particle_diameter,
142+
"kv": tomo_params.voltage,
143+
"node_creator_queue": machine_config.node_creator_queue,
144+
"session_id": message["session_id"],
145+
"autoproc_program_id": _app_id(pj_id, _db),
146+
"batch_size": default_tomo_parameters.batch_size_2d,
147+
"picker_id": None,
148+
"class2d_grp_uuid": class2d_grp_uuid,
149+
"class_uuids": class_uuids,
150+
},
151+
"recipes": ["em-tomo-class2d"],
152+
}
153+
if _transport_object:
154+
zocalo_message["parameters"][
155+
"feedback_queue"
156+
] = _transport_object.feedback_queue
157+
_transport_object.send(
158+
"processing_recipe", zocalo_message, new_connection=True
159+
)
160+
else:
161+
# If not enough particles then save the new sizes
162+
particle_list = message.get("particle_diameters")
163+
assert isinstance(particle_list, list)
164+
for particle in particle_list:
165+
new_particle = ParticleSizes(pj_id=pj_id, particle_size=particle)
166+
_db.add(new_particle)
167+
_db.commit()
168+
_db.close()
169+
170+
171+
def particles_tomogram(message: dict, murfey_db: Session) -> bool:
172+
_register_picked_tomogram_use_diameter(message, murfey_db)
173+
return True

0 commit comments

Comments
 (0)