Skip to content

Commit 97fef8d

Browse files
committed
Migrated 'data_collection' feedback-callback block into its own workflows module
1 parent 51717c4 commit 97fef8d

File tree

3 files changed

+104
-85
lines changed

3 files changed

+104
-85
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey"
106106
"clem.process_raw_tiffs" = "murfey.workflows.clem.process_raw_tiffs:zocalo_cluster_request"
107107
"clem.register_align_and_merge_result" = "murfey.workflows.clem.register_align_and_merge_results:register_align_and_merge_result"
108108
"clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run"
109+
"data_collection" = "murfey.workflows.register_data_collection:run"
109110
"data_collection_group" = "murfey.workflows.register_data_collection_group:run"
110111
"pato" = "murfey.workflows.notifications:notification_setup"
111112
"picked_particles" = "murfey.workflows.spa.picking:particles_picked"

src/murfey/server/feedback.py

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import murfey.server
4242
import murfey.server.prometheus as prom
4343
import murfey.util.db as db
44-
from murfey.server.ispyb import ISPyBSession, get_session_id
44+
from murfey.server.ispyb import ISPyBSession
4545
from murfey.server.murfey_db import url # murfey_db
4646
from murfey.util import sanitise
4747
from murfey.util.config import (
@@ -2031,90 +2031,6 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
20312031
"Call to data collection group hook failed", exc_info=True
20322032
)
20332033
return None
2034-
elif message["register"] == "data_collection":
2035-
logger.debug(
2036-
"Received message named 'data_collection' containing the following items:\n"
2037-
f"{', '.join([f'{sanitise(key)}: {sanitise(str(value))}' for key, value in message.items()])}"
2038-
)
2039-
murfey_session_id = message["session_id"]
2040-
ispyb_session_id = get_session_id(
2041-
microscope=message["microscope"],
2042-
proposal_code=message["proposal_code"],
2043-
proposal_number=message["proposal_number"],
2044-
visit_number=message["visit_number"],
2045-
db=ISPyBSession(),
2046-
)
2047-
dcg = _db.exec(
2048-
select(db.DataCollectionGroup)
2049-
.where(db.DataCollectionGroup.session_id == murfey_session_id)
2050-
.where(db.DataCollectionGroup.tag == message["source"])
2051-
).all()
2052-
if dcg:
2053-
dcgid = dcg[0].id
2054-
# flush_data_collections(message["source"], _db)
2055-
else:
2056-
logger.warning(
2057-
"No data collection group ID was found for image directory "
2058-
f"{sanitise(message['image_directory'])} and source "
2059-
f"{sanitise(message['source'])}"
2060-
)
2061-
if murfey.server._transport_object:
2062-
murfey.server._transport_object.transport.nack(header, requeue=True)
2063-
return None
2064-
if dc_murfey := _db.exec(
2065-
select(db.DataCollection)
2066-
.where(db.DataCollection.tag == message.get("tag"))
2067-
.where(db.DataCollection.dcg_id == dcgid)
2068-
).all():
2069-
dcid = dc_murfey[0].id
2070-
else:
2071-
if ispyb_session_id is None:
2072-
murfey_dc = db.DataCollection(
2073-
tag=message.get("tag"),
2074-
dcg_id=dcgid,
2075-
)
2076-
else:
2077-
record = DataCollection(
2078-
SESSIONID=ispyb_session_id,
2079-
experimenttype=message["experiment_type"],
2080-
imageDirectory=message["image_directory"],
2081-
imageSuffix=message["image_suffix"],
2082-
voltage=message["voltage"],
2083-
dataCollectionGroupId=dcgid,
2084-
pixelSizeOnImage=message["pixel_size"],
2085-
imageSizeX=message["image_size_x"],
2086-
imageSizeY=message["image_size_y"],
2087-
slitGapHorizontal=message.get("slit_width"),
2088-
magnification=message.get("magnification"),
2089-
exposureTime=message.get("exposure_time"),
2090-
totalExposedDose=message.get("total_exposed_dose"),
2091-
c2aperture=message.get("c2aperture"),
2092-
phasePlate=int(message.get("phase_plate", 0)),
2093-
)
2094-
dcid = _register(
2095-
record,
2096-
header,
2097-
tag=(
2098-
message.get("tag")
2099-
if message["experiment_type"] == "tomography"
2100-
else ""
2101-
),
2102-
)
2103-
murfey_dc = db.DataCollection(
2104-
id=dcid,
2105-
tag=message.get("tag"),
2106-
dcg_id=dcgid,
2107-
)
2108-
_db.add(murfey_dc)
2109-
_db.commit()
2110-
dcid = murfey_dc.id
2111-
_db.close()
2112-
if dcid is None and murfey.server._transport_object:
2113-
murfey.server._transport_object.transport.nack(header, requeue=True)
2114-
return None
2115-
if murfey.server._transport_object:
2116-
murfey.server._transport_object.transport.ack(header)
2117-
return None
21182034
elif message["register"] == "processing_job":
21192035
murfey_session_id = message["session_id"]
21202036
logger.info("registering processing job")
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import logging
2+
3+
import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
4+
from sqlmodel import select
5+
from sqlmodel.orm.session import Session as SQLModelSession
6+
7+
import murfey.util.db as MurfeyDB
8+
from murfey.server import _transport_object
9+
from murfey.server.ispyb import ISPyBSession, get_session_id
10+
from murfey.util import sanitise
11+
12+
logger = logging.getLogger("murfey.workflows.register_data_collection")
13+
14+
15+
def run(
16+
message: dict, murfey_db: SQLModelSession, demo: bool = False
17+
) -> dict[str, bool]:
18+
# Fail immediately if transport manager was not provided
19+
if _transport_object is None:
20+
logger.error("Unable to find transport manager")
21+
return {"success": False, "requeue": False}
22+
23+
logger.info(
24+
"Registering the following data collection:\n"
25+
f"{', '.join([f'{sanitise(key)}: {sanitise(str(value))}' for key, value in message.items()])}"
26+
)
27+
28+
murfey_session_id = message["session_id"]
29+
ispyb_session_id = get_session_id(
30+
microscope=message["microscope"],
31+
proposal_code=message["proposal_code"],
32+
proposal_number=message["proposal_number"],
33+
visit_number=message["visit_number"],
34+
db=ISPyBSession(),
35+
)
36+
dcg = murfey_db.exec(
37+
select(MurfeyDB.DataCollectionGroup)
38+
.where(MurfeyDB.DataCollectionGroup.session_id == murfey_session_id)
39+
.where(MurfeyDB.DataCollectionGroup.tag == message["source"])
40+
).all()
41+
if dcg:
42+
dcgid = dcg[0].id
43+
# flush_data_collections(message["source"], murfey_db)
44+
else:
45+
logger.warning(
46+
"No data collection group ID was found for image directory "
47+
f"{sanitise(message['image_directory'])} and source "
48+
f"{sanitise(message['source'])}"
49+
)
50+
return {"success": False, "requeue": True}
51+
52+
if dc_murfey := murfey_db.exec(
53+
select(MurfeyDB.DataCollection)
54+
.where(MurfeyDB.DataCollection.tag == message.get("tag"))
55+
.where(MurfeyDB.DataCollection.dcg_id == dcgid)
56+
).all():
57+
dcid = dc_murfey[0].id
58+
else:
59+
if ispyb_session_id is None:
60+
murfey_dc = MurfeyDB.DataCollection(
61+
tag=message.get("tag"),
62+
dcg_id=dcgid,
63+
)
64+
else:
65+
record = ISPyBDB.DataCollection(
66+
SESSIONID=ispyb_session_id,
67+
experimenttype=message["experiment_type"],
68+
imageDirectory=message["image_directory"],
69+
imageSuffix=message["image_suffix"],
70+
voltage=message["voltage"],
71+
dataCollectionGroupId=dcgid,
72+
pixelSizeOnImage=message["pixel_size"],
73+
imageSizeX=message["image_size_x"],
74+
imageSizeY=message["image_size_y"],
75+
slitGapHorizontal=message.get("slit_width"),
76+
magnification=message.get("magnification"),
77+
exposureTime=message.get("exposure_time"),
78+
totalExposedDose=message.get("total_exposed_dose"),
79+
c2aperture=message.get("c2aperture"),
80+
phasePlate=int(message.get("phase_plate", 0)),
81+
)
82+
dcid = _transport_object.do_insert_data_collection(
83+
record,
84+
tag=(
85+
message.get("tag")
86+
if message["experiment_type"] == "tomography"
87+
else ""
88+
),
89+
)
90+
murfey_dc = MurfeyDB.DataCollection(
91+
id=dcid,
92+
tag=message.get("tag"),
93+
dcg_id=dcgid,
94+
)
95+
murfey_db.add(murfey_dc)
96+
murfey_db.commit()
97+
dcid = murfey_dc.id
98+
murfey_db.close()
99+
100+
if dcid is None:
101+
return {"success": False, "requeue": True}
102+
return {"success": True}

0 commit comments

Comments
 (0)