Skip to content

Commit e82939c

Browse files
Email notification hookup (#484)
Put in the functionality to setup notification triggering for a data collection group on the receipt of a message from PATo. The conditions for requesting an email notification are the following: - 75% of the last 25 micrographs are outside of the bounds set by the user - The previous condition has been met following a period in which this was not true - No email notification has been sent for the last 500 micrographs --------- Co-authored-by: Eu Pin Tien <[email protected]>
1 parent 96aa5db commit e82939c

File tree

6 files changed

+547
-315
lines changed

6 files changed

+547
-315
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ murfey = "murfey.client:run"
107107
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
108108
"clem.register_lif_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_lif_preprocessing_result"
109109
"clem.register_tiff_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:register_tiff_preprocessing_result"
110+
"pato" = "murfey.workflows.notifications:notification_setup"
111+
"picked_particles" = "murfey.workflows.spa.picking:particles_picked"
110112
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"
111113

112114
[tool.setuptools]

src/murfey/server/__init__.py

Lines changed: 0 additions & 315 deletions
Original file line numberDiff line numberDiff line change
@@ -573,296 +573,6 @@ def _get_spa_params(
573573
return relion_params, feedback_params
574574

575575

576-
def _register_picked_particles_use_diameter(
577-
message: dict, _db=murfey_db, demo: bool = False
578-
):
579-
"""Received picked particles from the autopick service"""
580-
# Add this message to the table of seen messages
581-
params_to_forward = message.get("extraction_parameters")
582-
assert isinstance(params_to_forward, dict)
583-
pj_id = _pj_id(message["program_id"], _db)
584-
ctf_params = db.CtfParameters(
585-
pj_id=pj_id,
586-
micrographs_file=params_to_forward["micrographs_file"],
587-
extract_file=params_to_forward["extract_file"],
588-
coord_list_file=params_to_forward["coord_list_file"],
589-
ctf_image=params_to_forward["ctf_values"]["CtfImage"],
590-
ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"],
591-
ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"],
592-
defocus_u=params_to_forward["ctf_values"]["DefocusU"],
593-
defocus_v=params_to_forward["ctf_values"]["DefocusV"],
594-
defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"],
595-
)
596-
_db.add(ctf_params)
597-
_db.commit()
598-
_db.close()
599-
600-
picking_db_len = _db.exec(
601-
select(func.count(db.ParticleSizes.id)).where(db.ParticleSizes.pj_id == pj_id)
602-
).one()
603-
if picking_db_len > default_spa_parameters.nr_picks_before_diameter:
604-
# If there are enough particles to get a diameter
605-
instrument_name = (
606-
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
607-
.one()
608-
.instrument_name
609-
)
610-
machine_config = get_machine_config(instrument_name=instrument_name)[
611-
instrument_name
612-
]
613-
relion_params = _db.exec(
614-
select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id)
615-
).one()
616-
relion_options = dict(relion_params)
617-
feedback_params = _db.exec(
618-
select(db.SPAFeedbackParameters).where(
619-
db.SPAFeedbackParameters.pj_id == pj_id
620-
)
621-
).one()
622-
623-
if feedback_params.picker_ispyb_id is None:
624-
if demo or not _transport_object:
625-
feedback_params.picker_ispyb_id = 1000
626-
else:
627-
assert feedback_params.picker_murfey_id is not None
628-
feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup(
629-
message["program_id"], feedback_params.picker_murfey_id
630-
)
631-
if feedback_params.picker_ispyb_id is not None:
632-
_flush_class2d(message["session_id"], message["program_id"], _db)
633-
_db.add(feedback_params)
634-
_db.commit()
635-
selection_stash = _db.exec(
636-
select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id)
637-
).all()
638-
for s in selection_stash:
639-
_register_class_selection(
640-
{
641-
"session_id": s.session_id,
642-
"class_selection_score": s.class_selection_score or 0,
643-
},
644-
_db=_db,
645-
demo=demo,
646-
)
647-
_db.delete(s)
648-
_db.commit()
649-
650-
# Calculate diameter if it wasn't provided
651-
if not relion_params.particle_diameter:
652-
# If the diameter has not been calculated then find it
653-
picking_db = _db.exec(
654-
select(db.ParticleSizes.particle_size).where(
655-
db.ParticleSizes.pj_id == pj_id
656-
)
657-
).all()
658-
relion_params.particle_diameter = np.quantile(list(picking_db), 0.75)
659-
_db.add(relion_params)
660-
_db.commit()
661-
662-
ctf_db = _db.exec(
663-
select(db.CtfParameters).where(db.CtfParameters.pj_id == pj_id)
664-
).all()
665-
for saved_message in ctf_db:
666-
# Send on all saved messages to extraction
667-
_db.expunge(saved_message)
668-
zocalo_message: dict = {
669-
"parameters": {
670-
"micrographs_file": saved_message.micrographs_file,
671-
"coord_list_file": saved_message.coord_list_file,
672-
"output_file": saved_message.extract_file,
673-
"pixel_size": (
674-
relion_options["angpix"]
675-
* relion_options["motion_corr_binning"]
676-
),
677-
"ctf_image": saved_message.ctf_image,
678-
"ctf_max_resolution": saved_message.ctf_max_resolution,
679-
"ctf_figure_of_merit": saved_message.ctf_figure_of_merit,
680-
"defocus_u": saved_message.defocus_u,
681-
"defocus_v": saved_message.defocus_v,
682-
"defocus_angle": saved_message.defocus_angle,
683-
"particle_diameter": relion_params.particle_diameter,
684-
"downscale": relion_options["downscale"],
685-
"kv": relion_options["voltage"],
686-
"node_creator_queue": machine_config.node_creator_queue,
687-
"session_id": message["session_id"],
688-
"autoproc_program_id": _app_id(
689-
_pj_id(message["program_id"], _db, recipe="em-spa-extract"),
690-
_db,
691-
),
692-
"batch_size": default_spa_parameters.batch_size_2d,
693-
},
694-
"recipes": ["em-spa-extract"],
695-
}
696-
if _transport_object:
697-
zocalo_message["parameters"][
698-
"feedback_queue"
699-
] = _transport_object.feedback_queue
700-
_transport_object.send(
701-
"processing_recipe", zocalo_message, new_connection=True
702-
)
703-
# Use provided diameter for next step
704-
else:
705-
# If the diameter is known then just send the new message
706-
zocalo_message = {
707-
"parameters": {
708-
"micrographs_file": params_to_forward["micrographs_file"],
709-
"coord_list_file": params_to_forward["coord_list_file"],
710-
"output_file": params_to_forward["extract_file"],
711-
"pixel_size": (
712-
relion_options["angpix"] * relion_options["motion_corr_binning"]
713-
),
714-
"ctf_image": params_to_forward["ctf_values"]["CtfImage"],
715-
"ctf_max_resolution": params_to_forward["ctf_values"][
716-
"CtfMaxResolution"
717-
],
718-
"ctf_figure_of_merit": params_to_forward["ctf_values"][
719-
"CtfFigureOfMerit"
720-
],
721-
"defocus_u": params_to_forward["ctf_values"]["DefocusU"],
722-
"defocus_v": params_to_forward["ctf_values"]["DefocusV"],
723-
"defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"],
724-
"particle_diameter": relion_params.particle_diameter,
725-
"downscale": relion_options["downscale"],
726-
"kv": relion_options["voltage"],
727-
"node_creator_queue": machine_config.node_creator_queue,
728-
"session_id": message["session_id"],
729-
"autoproc_program_id": _app_id(
730-
_pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db
731-
),
732-
"batch_size": default_spa_parameters.batch_size_2d,
733-
},
734-
"recipes": ["em-spa-extract"],
735-
}
736-
if _transport_object:
737-
zocalo_message["parameters"][
738-
"feedback_queue"
739-
] = _transport_object.feedback_queue
740-
_transport_object.send(
741-
"processing_recipe", zocalo_message, new_connection=True
742-
)
743-
if demo:
744-
_register_incomplete_2d_batch(
745-
{
746-
"session_id": message["session_id"],
747-
"program_id": message["program_id"],
748-
"class2d_message": {
749-
"particles_file": "Select/job009/particles_split_1.star",
750-
"class2d_dir": "Class2D",
751-
"batch_size": 50000,
752-
},
753-
},
754-
_db=_db,
755-
demo=demo,
756-
)
757-
758-
else:
759-
# If not enough particles then save the new sizes
760-
particle_list = message.get("particle_diameters")
761-
assert isinstance(particle_list, list)
762-
for particle in particle_list:
763-
new_particle = db.ParticleSizes(pj_id=pj_id, particle_size=particle)
764-
_db.add(new_particle)
765-
_db.commit()
766-
_db.close()
767-
768-
769-
def _register_picked_particles_use_boxsize(message: dict, _db=murfey_db):
770-
"""Received picked particles from the autopick service"""
771-
# Add this message to the table of seen messages
772-
params_to_forward = message.get("extraction_parameters")
773-
assert isinstance(params_to_forward, dict)
774-
775-
instrument_name = (
776-
_db.exec(select(db.Session).where(db.Session.id == message["session_id"]))
777-
.one()
778-
.instrument_name
779-
)
780-
machine_config = get_machine_config(instrument_name=instrument_name)[
781-
instrument_name
782-
]
783-
pj_id = _pj_id(message["program_id"], _db)
784-
ctf_params = db.CtfParameters(
785-
pj_id=pj_id,
786-
micrographs_file=params_to_forward["micrographs_file"],
787-
coord_list_file=params_to_forward["coord_list_file"],
788-
extract_file=params_to_forward["extract_file"],
789-
ctf_image=params_to_forward["ctf_values"]["CtfImage"],
790-
ctf_max_resolution=params_to_forward["ctf_values"]["CtfMaxResolution"],
791-
ctf_figure_of_merit=params_to_forward["ctf_values"]["CtfFigureOfMerit"],
792-
defocus_u=params_to_forward["ctf_values"]["DefocusU"],
793-
defocus_v=params_to_forward["ctf_values"]["DefocusV"],
794-
defocus_angle=params_to_forward["ctf_values"]["DefocusAngle"],
795-
)
796-
_db.add(ctf_params)
797-
_db.commit()
798-
_db.close()
799-
800-
# Set particle diameter as zero and send box sizes
801-
relion_params = _db.exec(
802-
select(db.SPARelionParameters).where(db.SPARelionParameters.pj_id == pj_id)
803-
).one()
804-
feedback_params = _db.exec(
805-
select(db.SPAFeedbackParameters).where(db.SPAFeedbackParameters.pj_id == pj_id)
806-
).one()
807-
808-
if feedback_params.picker_ispyb_id is None and _transport_object:
809-
assert feedback_params.picker_murfey_id is not None
810-
feedback_params.picker_ispyb_id = _transport_object.do_buffer_lookup(
811-
message["program_id"], feedback_params.picker_murfey_id
812-
)
813-
if feedback_params.picker_ispyb_id is not None:
814-
_flush_class2d(message["session_id"], message["program_id"], _db)
815-
_db.add(feedback_params)
816-
_db.commit()
817-
selection_stash = _db.exec(
818-
select(db.SelectionStash).where(db.SelectionStash.pj_id == pj_id)
819-
).all()
820-
for s in selection_stash:
821-
_register_class_selection(
822-
{
823-
"session_id": s.session_id,
824-
"class_selection_score": s.class_selection_score or 0,
825-
},
826-
_db=_db,
827-
)
828-
_db.delete(s)
829-
_db.commit()
830-
831-
# Send the message to extraction with the box sizes
832-
zocalo_message: dict = {
833-
"parameters": {
834-
"micrographs_file": params_to_forward["micrographs_file"],
835-
"coord_list_file": params_to_forward["coord_list_file"],
836-
"output_file": params_to_forward["extract_file"],
837-
"pixel_size": relion_params.angpix * relion_params.motion_corr_binning,
838-
"ctf_image": params_to_forward["ctf_values"]["CtfImage"],
839-
"ctf_max_resolution": params_to_forward["ctf_values"]["CtfMaxResolution"],
840-
"ctf_figure_of_merit": params_to_forward["ctf_values"]["CtfFigureOfMerit"],
841-
"defocus_u": params_to_forward["ctf_values"]["DefocusU"],
842-
"defocus_v": params_to_forward["ctf_values"]["DefocusV"],
843-
"defocus_angle": params_to_forward["ctf_values"]["DefocusAngle"],
844-
"particle_diameter": relion_params.particle_diameter,
845-
"boxsize": relion_params.boxsize,
846-
"small_boxsize": relion_params.small_boxsize,
847-
"downscale": relion_params.downscale,
848-
"kv": relion_params.voltage,
849-
"node_creator_queue": machine_config.node_creator_queue,
850-
"session_id": message["session_id"],
851-
"autoproc_program_id": _app_id(
852-
_pj_id(message["program_id"], _db, recipe="em-spa-extract"), _db
853-
),
854-
"batch_size": default_spa_parameters.batch_size_2d,
855-
},
856-
"recipes": ["em-spa-extract"],
857-
}
858-
if _transport_object:
859-
zocalo_message["parameters"][
860-
"feedback_queue"
861-
] = _transport_object.feedback_queue
862-
_transport_object.send("processing_recipe", zocalo_message, new_connection=True)
863-
_db.close()
864-
865-
866576
def _release_2d_hold(message: dict, _db=murfey_db):
867577
relion_params, feedback_params = _get_spa_params(message["program_id"], _db)
868578
if not feedback_params.star_combination_job:
@@ -2851,31 +2561,6 @@ def feedback_callback(header: dict, message: dict) -> None:
28512561
if _transport_object:
28522562
_transport_object.transport.ack(header)
28532563
return None
2854-
elif message["register"] == "picked_particles":
2855-
movie = murfey_db.exec(
2856-
select(db.Movie).where(
2857-
db.Movie.murfey_id == message["motion_correction_id"]
2858-
)
2859-
).one()
2860-
movie.preprocessed = True
2861-
murfey_db.add(movie)
2862-
murfey_db.commit()
2863-
feedback_params = murfey_db.exec(
2864-
select(db.SPAFeedbackParameters).where(
2865-
db.SPAFeedbackParameters.pj_id
2866-
== _pj_id(message["program_id"], murfey_db)
2867-
)
2868-
).one()
2869-
if feedback_params.estimate_particle_diameter:
2870-
_register_picked_particles_use_diameter(message)
2871-
else:
2872-
_register_picked_particles_use_boxsize(message)
2873-
prom.preprocessed_movies.labels(
2874-
processing_job=_pj_id(message["program_id"], murfey_db)
2875-
).inc()
2876-
if _transport_object:
2877-
_transport_object.transport.ack(header)
2878-
return None
28792564
elif message["register"] == "done_incomplete_2d_batch":
28802565
_release_2d_hold(message)
28812566
if _transport_object:

src/murfey/util/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class MachineConfig(BaseModel, extra=Extra.allow): # type: ignore
7070
security_configuration_path: Optional[Path] = None
7171
auth_url: str = ""
7272

73+
notifications_queue: str = "pato_notification"
74+
7375

7476
def from_file(config_file_path: Path, instrument: str = "") -> Dict[str, MachineConfig]:
7577
with open(config_file_path, "r") as config_stream:

src/murfey/util/db.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,10 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore
360360
back_populates="data_collection_group",
361361
sa_relationship_kwargs={"cascade": "delete"},
362362
)
363+
notification_parameters: List["NotificationParameter"] = Relationship(
364+
back_populates="data_collection_group",
365+
sa_relationship_kwargs={"cascade": "delete"},
366+
)
363367
tomography_preprocessing_parameters: List["TomographyPreprocessingParameters"] = (
364368
Relationship(
365369
back_populates="data_collection_group",
@@ -368,6 +372,33 @@ class DataCollectionGroup(SQLModel, table=True): # type: ignore
368372
)
369373

370374

375+
class NotificationParameter(SQLModel, table=True): # type: ignore
376+
id: Optional[int] = Field(default=None, primary_key=True)
377+
dcg_id: int = Field(foreign_key="datacollectiongroup.id")
378+
name: str
379+
min_value: float
380+
max_value: float
381+
num_instances_since_triggered: int = 0
382+
notification_active: bool = False
383+
data_collection_group: Optional[DataCollectionGroup] = Relationship(
384+
back_populates="notification_parameters"
385+
)
386+
notification_values: List["NotificationValue"] = Relationship(
387+
back_populates="notification_parameter",
388+
sa_relationship_kwargs={"cascade": "delete"},
389+
)
390+
391+
392+
class NotificationValue(SQLModel, table=True): # type: ignore
393+
id: Optional[int] = Field(default=None, primary_key=True)
394+
notification_parameter_id: int = Field(foreign_key="notificationparameter.id")
395+
index: int
396+
within_bounds: bool
397+
notification_parameter: Optional[NotificationParameter] = Relationship(
398+
back_populates="notification_values"
399+
)
400+
401+
371402
class DataCollection(SQLModel, table=True): # type: ignore
372403
id: int = Field(primary_key=True, unique=True)
373404
tag: str = Field(primary_key=True)

0 commit comments

Comments
 (0)