Skip to content

Commit cb61821

Browse files
committed
Add logic for determining when a notification should be raised
There are some hard coded parameters in here at the moment
1 parent c122d56 commit cb61821

File tree

5 files changed

+411
-319
lines changed

5 files changed

+411
-319
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ murfey = "murfey.client:run"
108108
"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
109109
"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
110110
"pato" = "murfey.workflows.notifications:notification_setup"
111+
"picked_particles" = "murfey.wrokflows.spa.picking:particles_picked"
111112
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"
112113

113114
[tool.setuptools]

src/murfey/server/__init__.py

Lines changed: 0 additions & 317 deletions
Original file line numberDiff line numberDiff line change
@@ -573,298 +573,6 @@ def _get_spa_params(
573573
return relion_params, feedback_params
574574

575575

576-
def _register_picked_particles_use_diameter(
577-
message: dict, _db=murfey_db, demo: bool = False
578-
):
579-
"""Received picked particles from the autopick service"""
580-
# Add this message to the table of seen messages
581-
params_to_forward = message.get("extraction_parameters")
582-
assert isinstance(params_to_forward, dict)
583-
pj_id = _pj_id(message["program_id"], _db)
584-
ctf_params = db.CtfParameters(
585-
pj_id=pj_id,
586-
micrographs_file=params_to_forward["micrographs_file"],
587-
extract_file=params_to_forward["extract_file"],
588-
coord_list_file=params_to_forward["coord_list_file"],
589-
ctf_image=params_to_forward["ctf_values"]["CtfImage"],
590-
ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"],
591-
ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"],
592-
defocus_u=params_to_forward["ctf_values"]["DefocusU"],
593-
defocus_v=params_to_forward["ctf_values"]["DefocusV"],
594-
defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"],
595-
)
596-
_db.add(ctf_params)
597-
_db.commit()
598-
_db.close()
599-
600-
picking_db_len = _db.exec(
601-
select(func.count(db.ParticleSizes.id)).where(db.ParticleSizes.pj_id == pj_id)
602-
).one()
603-
if picking_db_len > default_spa_parameters.nr_picks_before_diameter:
604-
# If there are enough particles to get a diameter
605-
instrument_name = (
606-
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
607-
.one()
608-
.instrument_name
609-
)
610-
machine_config = get_machine_config(instrument_name=instrument_name)[
611-
instrument_name
612-
]
613-
relion_params = _db.exec(
614-
select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id)
615-
).one()
616-
relion_options = dict(relion_params)
617-
feedback_params = _db.exec(
618-
select(db.SPAFeedbackParameters).where(
619-
db.SPAFeedbackParameters.pj_id == pj_id
620-
)
621-
).one()
622-
623-
particle_diameter = relion_params.particle_diameter
624-
625-
if feedback_params.picker_ispyb_id is None:
626-
if demo or not _transport_object:
627-
feedback_params.picker_ispyb_id = 1000
628-
else:
629-
assert feedback_params.picker_murfey_id is not None
630-
feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup(
631-
message["program_id"], feedback_params.picker_murfey_id
632-
)
633-
if feedback_params.picker_ispyb_id is not None:
634-
_flush_class2d(message["session_id"], message["program_id"], _db)
635-
_db.add(feedback_params)
636-
_db.commit()
637-
selection_stash = _db.exec(
638-
select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id)
639-
).all()
640-
for s in selection_stash:
641-
_register_class_selection(
642-
{
643-
"session_id": s.session_id,
644-
"class_selection_score": s.class_selection_score or 0,
645-
},
646-
_db=_db,
647-
demo=demo,
648-
)
649-
_db.delete(s)
650-
_db.commit()
651-
652-
if not particle_diameter:
653-
# If the diameter has not been calculated then find it
654-
picking_db = _db.exec(
655-
select(db.ParticleSizes.particle_size).where(
656-
db.ParticleSizes.pj_id == pj_id
657-
)
658-
).all()
659-
particle_diameter = np.quantile(list(picking_db), 0.75)
660-
relion_params.particle_diameter = particle_diameter
661-
_db.add(relion_params)
662-
_db.commit()
663-
664-
ctf_db = _db.exec(
665-
select(db.CtfParameters).where(db.CtfParameters.pj_id == pj_id)
666-
).all()
667-
for saved_message in ctf_db:
668-
# Send on all saved messages to extraction
669-
_db.expunge(saved_message)
670-
zocalo_message: dict = {
671-
"parameters": {
672-
"micrographs_file": saved_message.micrographs_file,
673-
"coord_list_file": saved_message.coord_list_file,
674-
"output_file": saved_message.extract_file,
675-
"pixel_size": (
676-
relion_options["angpix"]
677-
* relion_options["motion_corr_binning"]
678-
),
679-
"ctf_image": saved_message.ctf_image,
680-
"ctf_max_resolution": saved_message.ctf_max_resolution,
681-
"ctf_figure_of_merit": saved_message.ctf_figure_of_merit,
682-
"defocus_u": saved_message.defocus_u,
683-
"defocus_v": saved_message.defocus_v,
684-
"defocus_angle": saved_message.defocus_angle,
685-
"particle_diameter": particle_diameter,
686-
"downscale": relion_options["downscale"],
687-
"kv": relion_options["voltage"],
688-
"node_creator_queue": machine_config.node_creator_queue,
689-
"session_id": message["session_id"],
690-
"autoproc_program_id": _app_id(
691-
_pj_id(message["program_id"], _db, recipe="em-spa-extract"),
692-
_db,
693-
),
694-
"batch_size": default_spa_parameters.batch_size_2d,
695-
},
696-
"recipes": ["em-spa-extract"],
697-
}
698-
if _transport_object:
699-
zocalo_message["parameters"][
700-
"feedback_queue"
701-
] = _transport_object.feedback_queue
702-
_transport_object.send(
703-
"processing_recipe", zocalo_message, new_connection=True
704-
)
705-
else:
706-
# If the diameter is known then just send the new message
707-
particle_diameter = relion_params.particle_diameter
708-
zocalo_message = {
709-
"parameters": {
710-
"micrographs_file": params_to_forward["micrographs_file"],
711-
"coord_list_file": params_to_forward["coord_list_file"],
712-
"output_file": params_to_forward["extract_file"],
713-
"pixel_size": (
714-
relion_options["angpix"] * relion_options["motion_corr_binning"]
715-
),
716-
"ctf_image": params_to_forward["ctf_values"]["CtfImage"],
717-
"ctf_max_resolution": params_to_forward["ctf_values"][
718-
"CtfMaxResolution"
719-
],
720-
"ctf_figure_of_merit": params_to_forward["ctf_values"][
721-
"CtfFigureOfMerit"
722-
],
723-
"defocus_u": params_to_forward["ctf_values"]["DefocusU"],
724-
"defocus_v": params_to_forward["ctf_values"]["DefocusV"],
725-
"defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"],
726-
"particle_diameter": particle_diameter,
727-
"downscale": relion_options["downscale"],
728-
"kv": relion_options["voltage"],
729-
"node_creator_queue": machine_config.node_creator_queue,
730-
"session_id": message["session_id"],
731-
"autoproc_program_id": _app_id(
732-
_pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db
733-
),
734-
"batch_size": default_spa_parameters.batch_size_2d,
735-
},
736-
"recipes": ["em-spa-extract"],
737-
}
738-
if _transport_object:
739-
zocalo_message["parameters"][
740-
"feedback_queue"
741-
] = _transport_object.feedback_queue
742-
_transport_object.send(
743-
"processing_recipe", zocalo_message, new_connection=True
744-
)
745-
if demo:
746-
_register_incomplete_2d_batch(
747-
{
748-
"session_id": message["session_id"],
749-
"program_id": message["program_id"],
750-
"class2d_message": {
751-
"particles_file": "Select/job009/particles_split_1.star",
752-
"class2d_dir": "Class2D",
753-
"batch_size": 50000,
754-
},
755-
},
756-
_db=_db,
757-
demo=demo,
758-
)
759-
760-
else:
761-
# If not enough particles then save the new sizes
762-
particle_list = message.get("particle_diameters")
763-
assert isinstance(particle_list, list)
764-
for particle in particle_list:
765-
new_particle = db.ParticleSizes(pj_id=pj_id, particle_size=particle)
766-
_db.add(new_particle)
767-
_db.commit()
768-
_db.close()
769-
770-
771-
def _register_picked_particles_use_boxsize(message: dict, _db=murfey_db):
772-
"""Received picked particles from the autopick service"""
773-
# Add this message to the table of seen messages
774-
params_to_forward = message.get("extraction_parameters")
775-
assert isinstance(params_to_forward, dict)
776-
777-
instrument_name = (
778-
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
779-
.one()
780-
.instrument_name
781-
)
782-
machine_config = get_machine_config(instrument_name=instrument_name)[
783-
instrument_name
784-
]
785-
pj_id = _pj_id(message["program_id"], _db)
786-
ctf_params = db.CtfParameters(
787-
pj_id=pj_id,
788-
micrographs_file=params_to_forward["micrographs_file"],
789-
coord_list_file=params_to_forward["coord_list_file"],
790-
extract_file=params_to_forward["extract_file"],
791-
ctf_image=params_to_forward["ctf_values"]["CtfImage"],
792-
ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"],
793-
ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"],
794-
defocus_u=params_to_forward["ctf_values"]["DefocusU"],
795-
defocus_v=params_to_forward["ctf_values"]["DefocusV"],
796-
defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"],
797-
)
798-
_db.add(ctf_params)
799-
_db.commit()
800-
_db.close()
801-
802-
# Set particle diameter as zero and send box sizes
803-
relion_params = _db.exec(
804-
select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id)
805-
).one()
806-
feedback_params = _db.exec(
807-
select(db.SPAFeedbackParameters).where(db.SPAFeedbackParameters.pj_id == pj_id)
808-
).one()
809-
810-
if feedback_params.picker_ispyb_id is None and _transport_object:
811-
assert feedback_params.picker_murfey_id is not None
812-
feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup(
813-
message["program_id"], feedback_params.picker_murfey_id
814-
)
815-
if feedback_params.picker_ispyb_id is not None:
816-
_flush_class2d(message["session_id"], message["program_id"], _db)
817-
_db.add(feedback_params)
818-
_db.commit()
819-
selection_stash = _db.exec(
820-
select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id)
821-
).all()
822-
for s in selection_stash:
823-
_register_class_selection(
824-
{
825-
"session_id": s.session_id,
826-
"class_selection_score": s.class_selection_score or 0,
827-
},
828-
_db=_db,
829-
)
830-
_db.delete(s)
831-
_db.commit()
832-
833-
# Send the message to extraction with the box sizes
834-
zocalo_message: dict = {
835-
"parameters": {
836-
"micrographs_file": params_to_forward["micrographs_file"],
837-
"coord_list_file": params_to_forward["coord_list_file"],
838-
"output_file": params_to_forward["extract_file"],
839-
"pixel_size": relion_params.angpix * relion_params.motion_corr_binning,
840-
"ctf_image": params_to_forward["ctf_values"]["CtfImage"],
841-
"ctf_max_resolution": params_to_forward["ctf_values"]["CtfMaxResolution"],
842-
"ctf_figure_of_merit": params_to_forward["ctf_values"]["CtfFigureOfMerit"],
843-
"defocus_u": params_to_forward["ctf_values"]["DefocusU"],
844-
"defocus_v": params_to_forward["ctf_values"]["DefocusV"],
845-
"defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"],
846-
"particle_diameter": relion_params.particle_diameter,
847-
"boxsize": relion_params.boxsize,
848-
"small_boxsize": relion_params.small_boxsize,
849-
"downscale": relion_params.downscale,
850-
"kv": relion_params.voltage,
851-
"node_creator_queue": machine_config.node_creator_queue,
852-
"session_id": message["session_id"],
853-
"autoproc_program_id": _app_id(
854-
_pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db
855-
),
856-
"batch_size": default_spa_parameters.batch_size_2d,
857-
},
858-
"recipes": ["em-spa-extract"],
859-
}
860-
if _transport_object:
861-
zocalo_message["parameters"][
862-
"feedback_queue"
863-
] = _transport_object.feedback_queue
864-
_transport_object.send("processing_recipe", zocalo_message, new_connection=True)
865-
_db.close()
866-
867-
868576
def _release_2d_hold(message: dict, _db=murfey_db):
869577
relion_params, feedback_params = _get_spa_params(message["program_id"], _db)
870578
if not feedback_params.star_combination_job:
@@ -2841,31 +2549,6 @@ def feedback_callback(header: dict, message: dict) -> None:
28412549
if _transport_object:
28422550
_transport_object.transport.ack(header)
28432551
return None
2844-
elif message["register"] == "picked_particles":
2845-
movie = murfey_db.exec(
2846-
select(db.Movie).where(
2847-
db.Movie.murfey_id == message["motion_correction_id"]
2848-
)
2849-
).one()
2850-
movie.preprocessed = True
2851-
murfey_db.add(movie)
2852-
murfey_db.commit()
2853-
feedback_params = murfey_db.exec(
2854-
select(db.SPAFeedbackParameters).where(
2855-
db.SPAFeedbackParameters.pj_id
2856-
== _pj_id(message["program_id"], murfey_db)
2857-
)
2858-
).one()
2859-
if feedback_params.estimate_particle_diameter:
2860-
_register_picked_particles_use_diameter(message)
2861-
else:
2862-
_register_picked_particles_use_boxsize(message)
2863-
prom.preprocessed_movies.labels(
2864-
processing_job=_pj_id(message["program_id"], murfey_db)
2865-
).inc()
2866-
if _transport_object:
2867-
_transport_object.transport.ack(header)
2868-
return None
28692552
elif message["register"] == "done_incomplete_2d_batch":
28702553
_release_2d_hold(message)
28712554
if _transport_object:

src/murfey/util/db.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ class NotificationValue(SQLModel, table=True): # type: ignore
395395
foreign_key="notificationparameter.id", primary_key=True
396396
)
397397
index: int
398+
within_bounds: bool
398399
notification_parameter: Optional[DataCollectionGroup] = Relationship(
399400
back_populates="notification_values"
400401
)

src/murfey/workflows/notifications/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
from typing import Dict, Tuple
23

34
from sqlmodel import Session, select
@@ -15,9 +16,10 @@ def notification_setup(
1516
parameter_name = k[:-3]
1617
else:
1718
continue
18-
if parameter_name in parameters.keys():
19+
snake_parameter_name = re.sub(r"(?<!^)(?=[A-Z])", "_", parameter_name).lower()
20+
if snake_parameter_name in parameters.keys():
1921
continue
20-
parameters[parameter_name] = (
22+
parameters[snake_parameter_name] = (
2123
message.get(f"{parameter_name}Min", 0),
2224
message.get(f"{parameter_name}Max", 10000),
2325
)

0 commit comments

Comments
 (0)