Skip to content

Commit 51717c4

Browse files
committed
Migrated 'data_collection_group' feedback-callback block into its own workflows module
1 parent b13c4ad commit 51717c4

File tree

3 files changed

+92
-71
lines changed

3 files changed

+92
-71
lines changed

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ server = [
6262
"aiohttp",
6363
"cryptography",
6464
"fastapi[standard]<0.116.0",
65+
"graypy",
6566
"ispyb>=10.2.4", # Responsible for setting requirements for SQLAlchemy and mysql-connector-python;
6667
"jinja2",
6768
"mrcfile",
@@ -73,7 +74,7 @@ server = [
7374
"python-jose[cryptography]",
7475
"sqlalchemy[postgresql]", # Add as explicit dependency
7576
"sqlmodel",
76-
"stomp-py<=8.1.0", # 8.1.1 (released 2024-04-06) doesn't work with our project
77+
"stomp-py>8.1.0", # 8.1.1 (released 2024-04-06) doesn't work with our project
7778
"zocalo>=1",
7879
]
7980
[project.urls]
@@ -105,6 +106,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey"
105106
"clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request"
106107
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
107108
"clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run"
109+
"data_collection_group" = "murfey.workflows.register_data_collection_group:run"
108110
"pato" = "murfey.workflows.notifications:notification_setup"
109111
"picked_particles" = "murfey.workflows.spa.picking:particles_picked"
110112
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"

src/murfey/server/feedback.py

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import numpy as np
2222
from backports.entry_points_selectable import entry_points
2323
from ispyb.sqlalchemy._auto_db_schema import (
24-
Atlas,
2524
AutoProcProgram,
2625
Base,
2726
DataCollection,
@@ -2012,75 +2011,6 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
20122011
if murfey.server._transport_object:
20132012
murfey.server._transport_object.transport.ack(header)
20142013
return None
2015-
elif message["register"] == "data_collection_group":
2016-
ispyb_session_id = get_session_id(
2017-
microscope=message["microscope"],
2018-
proposal_code=message["proposal_code"],
2019-
proposal_number=message["proposal_number"],
2020-
visit_number=message["visit_number"],
2021-
db=ISPyBSession(),
2022-
)
2023-
if dcg_murfey := _db.exec(
2024-
select(db.DataCollectionGroup)
2025-
.where(db.DataCollectionGroup.session_id == message["session_id"])
2026-
.where(db.DataCollectionGroup.tag == message.get("tag"))
2027-
).all():
2028-
dcgid = dcg_murfey[0].id
2029-
else:
2030-
if ispyb_session_id is None:
2031-
murfey_dcg = db.DataCollectionGroup(
2032-
session_id=message["session_id"],
2033-
tag=message.get("tag"),
2034-
)
2035-
dcgid = murfey_dcg.id
2036-
else:
2037-
record = DataCollectionGroup(
2038-
sessionId=ispyb_session_id,
2039-
experimentType=message["experiment_type"],
2040-
experimentTypeId=message["experiment_type_id"],
2041-
)
2042-
dcgid = _register(record, header)
2043-
atlas_record = Atlas(
2044-
dataCollectionGroupId=dcgid,
2045-
atlasImage=message.get("atlas", ""),
2046-
pixelSize=message.get("atlas_pixel_size", 0),
2047-
cassetteSlot=message.get("sample"),
2048-
)
2049-
if murfey.server._transport_object:
2050-
atlas_id = murfey.server._transport_object.do_insert_atlas(
2051-
atlas_record
2052-
)["return_value"]
2053-
else:
2054-
atlas_id = None
2055-
murfey_dcg = db.DataCollectionGroup(
2056-
id=dcgid,
2057-
atlas_id=atlas_id,
2058-
atlas=message.get("atlas", ""),
2059-
atlas_pixel_size=message.get("atlas_pixel_size"),
2060-
sample=message.get("sample"),
2061-
session_id=message["session_id"],
2062-
tag=message.get("tag"),
2063-
)
2064-
_db.add(murfey_dcg)
2065-
_db.commit()
2066-
_db.close()
2067-
if murfey.server._transport_object:
2068-
if dcgid is None:
2069-
time.sleep(2)
2070-
murfey.server._transport_object.transport.nack(header, requeue=True)
2071-
return None
2072-
murfey.server._transport_object.transport.ack(header)
2073-
if dcg_hooks := entry_points().select(
2074-
group="murfey.hooks", name="data_collection_group"
2075-
):
2076-
try:
2077-
for hook in dcg_hooks:
2078-
hook.load()(dcgid, session_id=message["session_id"])
2079-
except Exception:
2080-
logger.error(
2081-
"Call to data collection group hook failed", exc_info=True
2082-
)
2083-
return None
20842014
elif message["register"] == "atlas_update":
20852015
if murfey.server._transport_object:
20862016
murfey.server._transport_object.do_update_atlas(
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import logging
2+
import time
3+
4+
import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
5+
from backports.entry_points_selectable import entry_points
6+
from sqlmodel import select
7+
from sqlmodel.orm.session import Session as SQLModelSession
8+
9+
import murfey.util.db as MurfeyDB
10+
from murfey.server import _transport_object
11+
from murfey.server.ispyb import ISPyBSession, get_session_id
12+
13+
logger = logging.getLogger("murfey.workflows.register_data_collection_group")
14+
15+
16+
def run(
17+
message: dict, murfey_db: SQLModelSession, demo: bool = False
18+
) -> dict[str, bool]:
19+
# Fail immediately if no transport wrapper is found
20+
if _transport_object is None:
21+
logger.error("Unable to find transport manager")
22+
return {"success": False, "requeue": False}
23+
24+
ispyb_session_id = get_session_id(
25+
microscope=message["microscope"],
26+
proposal_code=message["proposal_code"],
27+
proposal_number=message["proposal_number"],
28+
visit_number=message["visit_number"],
29+
db=ISPyBSession(),
30+
)
31+
if dcg_murfey := murfey_db.exec(
32+
select(MurfeyDB.DataCollectionGroup)
33+
.where(MurfeyDB.DataCollectionGroup.session_id == message["session_id"])
34+
.where(MurfeyDB.DataCollectionGroup.tag == message.get("tag"))
35+
).all():
36+
dcgid = dcg_murfey[0].id
37+
else:
38+
if ispyb_session_id is None:
39+
murfey_dcg = MurfeyDB.DataCollectionGroup(
40+
session_id=message["session_id"],
41+
tag=message.get("tag"),
42+
)
43+
dcgid = murfey_dcg.id
44+
else:
45+
record = ISPyBDB.DataCollectionGroup(
46+
sessionId=ispyb_session_id,
47+
experimentType=message["experiment_type"],
48+
experimentTypeId=message["experiment_type_id"],
49+
)
50+
51+
dcgid = _transport_object.do_insert_data_collection_group(record).get(
52+
"return_value", None
53+
)
54+
atlas_record = ISPyBDB.Atlas(
55+
dataCollectionGroupId=dcgid,
56+
atlasImage=message.get("atlas", ""),
57+
pixelSize=message.get("atlas_pixel_size", 0),
58+
cassetteSlot=message.get("sample"),
59+
)
60+
if _transport_object:
61+
atlas_id = _transport_object.do_insert_atlas(atlas_record)[
62+
"return_value"
63+
]
64+
else:
65+
atlas_id = None
66+
murfey_dcg = MurfeyDB.DataCollectionGroup(
67+
id=dcgid,
68+
atlas_id=atlas_id,
69+
atlas=message.get("atlas", ""),
70+
atlas_pixel_size=message.get("atlas_pixel_size"),
71+
sample=message.get("sample"),
72+
session_id=message["session_id"],
73+
tag=message.get("tag"),
74+
)
75+
murfey_db.add(murfey_dcg)
76+
murfey_db.commit()
77+
murfey_db.close()
78+
if dcgid is None:
79+
time.sleep(2)
80+
return {"success": False, "requeue": True}
81+
if dcg_hooks := entry_points().select(
82+
group="murfey.hooks", name="data_collection_group"
83+
):
84+
try:
85+
for hook in dcg_hooks:
86+
hook.load()(dcgid, session_id=message["session_id"])
87+
except Exception:
88+
logger.error("Call to data collection group hook failed", exc_info=True)
89+
return {"success": True}

0 commit comments

Comments
 (0)