Skip to content

Commit 4591e76

Browse files
committed
Migrated 'processing_job' feedback-callback block to its own workflows module
1 parent 9034eb8 commit 4591e76

File tree

3 files changed

+100
-78
lines changed

3 files changed

+100
-78
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey"
112112
"pato" = "murfey.workflows.notifications:notification_setup"
113113
"picked_particles" = "murfey.workflows.spa.picking:particles_picked"
114114
"picked_tomogram" = "murfey.workflows.tomo.picking:picked_tomogram"
115+
"processing_job" = "murfey.workflows.register_processing_job:run"
115116
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"
116117

117118
[tool.setuptools]

src/murfey/server/feedback.py

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
DataCollection,
2727
DataCollectionGroup,
2828
ProcessingJob,
29-
ProcessingJobParameter,
3029
)
3130
from sqlalchemy import func
3231
from sqlalchemy.exc import (
@@ -41,7 +40,6 @@
4140
import murfey.server
4241
import murfey.server.prometheus as prom
4342
import murfey.util.db as db
44-
from murfey.server.ispyb import ISPyBSession
4543
from murfey.server.murfey_db import url # murfey_db
4644
from murfey.util import sanitise
4745
from murfey.util.config import (
@@ -1901,7 +1899,6 @@ def _save_bfactor(message: dict, _db, demo: bool = False):
19011899

19021900
def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
19031901
try:
1904-
record = None
19051902
if "environment" in message:
19061903
params = message["recipe"][str(message["recipe-pointer"])].get(
19071904
"parameters", {}
@@ -2013,81 +2010,6 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
20132010
if murfey.server._transport_object:
20142011
murfey.server._transport_object.transport.ack(header)
20152012
return None
2016-
elif message["register"] == "processing_job":
2017-
murfey_session_id = message["session_id"]
2018-
logger.info("registering processing job")
2019-
dc = _db.exec(
2020-
select(db.DataCollection, db.DataCollectionGroup)
2021-
.where(db.DataCollection.dcg_id == db.DataCollectionGroup.id)
2022-
.where(db.DataCollectionGroup.session_id == murfey_session_id)
2023-
.where(db.DataCollectionGroup.tag == message["source"])
2024-
.where(db.DataCollection.tag == message["tag"])
2025-
).all()
2026-
if dc:
2027-
_dcid = dc[0][0].id
2028-
else:
2029-
logger.warning(
2030-
f"No data collection ID found for {sanitise(message['tag'])}"
2031-
)
2032-
if murfey.server._transport_object:
2033-
murfey.server._transport_object.transport.nack(header, requeue=True)
2034-
return None
2035-
if pj_murfey := _db.exec(
2036-
select(db.ProcessingJob)
2037-
.where(db.ProcessingJob.recipe == message["recipe"])
2038-
.where(db.ProcessingJob.dc_id == _dcid)
2039-
).all():
2040-
pid = pj_murfey[0].id
2041-
else:
2042-
if ISPyBSession() is None:
2043-
murfey_pj = db.ProcessingJob(recipe=message["recipe"], dc_id=_dcid)
2044-
else:
2045-
record = ProcessingJob(
2046-
dataCollectionId=_dcid, recipe=message["recipe"]
2047-
)
2048-
run_parameters = message.get("parameters", {})
2049-
assert isinstance(run_parameters, dict)
2050-
if message.get("job_parameters"):
2051-
job_parameters = [
2052-
ProcessingJobParameter(parameterKey=k, parameterValue=v)
2053-
for k, v in message["job_parameters"].items()
2054-
]
2055-
pid = _register(ExtendedRecord(record, job_parameters), header)
2056-
else:
2057-
pid = _register(record, header)
2058-
murfey_pj = db.ProcessingJob(
2059-
id=pid, recipe=message["recipe"], dc_id=_dcid
2060-
)
2061-
_db.add(murfey_pj)
2062-
_db.commit()
2063-
pid = murfey_pj.id
2064-
_db.close()
2065-
if pid is None and murfey.server._transport_object:
2066-
murfey.server._transport_object.transport.nack(header, requeue=True)
2067-
return None
2068-
prom.preprocessed_movies.labels(processing_job=pid)
2069-
if not _db.exec(
2070-
select(db.AutoProcProgram).where(db.AutoProcProgram.pj_id == pid)
2071-
).all():
2072-
if ISPyBSession() is None:
2073-
murfey_app = db.AutoProcProgram(pj_id=pid)
2074-
else:
2075-
record = AutoProcProgram(
2076-
processingJobId=pid, processingStartTime=datetime.now()
2077-
)
2078-
appid = _register(record, header)
2079-
if appid is None and murfey.server._transport_object:
2080-
murfey.server._transport_object.transport.nack(
2081-
header, requeue=True
2082-
)
2083-
return None
2084-
murfey_app = db.AutoProcProgram(id=appid, pj_id=pid)
2085-
_db.add(murfey_app)
2086-
_db.commit()
2087-
_db.close()
2088-
if murfey.server._transport_object:
2089-
murfey.server._transport_object.transport.ack(header)
2090-
return None
20912013
elif message["register"] == "flush_tomography_preprocess":
20922014
_flush_tomography_preprocessing(message, _db)
20932015
if murfey.server._transport_object:
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import logging
2+
from datetime import datetime
3+
4+
import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
5+
from sqlmodel import select
6+
from sqlmodel.orm.session import Session as SQLModelSession
7+
8+
import murfey.server.prometheus as prom
9+
import murfey.util.db as MurfeyDB
10+
from murfey.server import _transport_object
11+
from murfey.server.ispyb import ISPyBSession
12+
from murfey.util import sanitise
13+
14+
logger = logging.getLogger("murfey.workflows.register_processing_job")
15+
16+
17+
def run(message: dict, murfey_db: SQLModelSession, demo: bool = False):
18+
# Faill immediately if not transport manager is set
19+
if _transport_object is None:
20+
logger.error("Unable to find transport manager")
21+
return {"success": False, "requeue": False}
22+
23+
logger.info(f"Registering the following processing job: \n{message}")
24+
25+
murfey_session_id = message["session_id"]
26+
dc = murfey_db.exec(
27+
select(MurfeyDB.DataCollection, MurfeyDB.DataCollectionGroup)
28+
.where(MurfeyDB.DataCollection.dcg_id == MurfeyDB.DataCollectionGroup.id)
29+
.where(MurfeyDB.DataCollectionGroup.session_id == murfey_session_id)
30+
.where(MurfeyDB.DataCollectionGroup.tag == message["source"])
31+
.where(MurfeyDB.DataCollection.tag == message["tag"])
32+
).all()
33+
34+
if dc:
35+
_dcid = dc[0][0].id
36+
else:
37+
logger.warning(f"No data collection ID found for {sanitise(message['tag'])}")
38+
return {"success": False, "requeue": True}
39+
if pj_murfey := murfey_db.exec(
40+
select(MurfeyDB.ProcessingJob)
41+
.where(MurfeyDB.ProcessingJob.recipe == message["recipe"])
42+
.where(MurfeyDB.ProcessingJob.dc_id == _dcid)
43+
).all():
44+
pid = pj_murfey[0].id
45+
else:
46+
if ISPyBSession() is None:
47+
murfey_pj = MurfeyDB.ProcessingJob(recipe=message["recipe"], dc_id=_dcid)
48+
else:
49+
record = ISPyBDB.ProcessingJob(
50+
dataCollectionId=_dcid, recipe=message["recipe"]
51+
)
52+
run_parameters = message.get("parameters", {})
53+
assert isinstance(run_parameters, dict)
54+
if message.get("job_parameters"):
55+
job_parameters = [
56+
ISPyBDB.ProcessingJobParameter(parameterKey=k, parameterValue=v)
57+
for k, v in message["job_parameters"].items()
58+
]
59+
pid = _transport_object.do_create_ispyb_job(
60+
record, params=job_parameters
61+
).get("return_value", None)
62+
else:
63+
pid = _transport_object.do_create_ispyb_job(record).get(
64+
"return_value", None
65+
)
66+
murfey_pj = MurfeyDB.ProcessingJob(
67+
id=pid, recipe=message["recipe"], dc_id=_dcid
68+
)
69+
murfey_db.add(murfey_pj)
70+
murfey_db.commit()
71+
pid = murfey_pj.id
72+
murfey_db.close()
73+
74+
if pid is None:
75+
return {"success": False, "requeue": True}
76+
77+
# Update Prometheus counter for preprocessed movies
78+
prom.preprocessed_movies.labels(processing_job=pid)
79+
80+
# Register AutoProcProgram database entry if it doesn't already exist
81+
if not murfey_db.exec(
82+
select(MurfeyDB.AutoProcProgram).where(MurfeyDB.AutoProcProgram.pj_id == pid)
83+
).all():
84+
if ISPyBSession() is None:
85+
murfey_app = MurfeyDB.AutoProcProgram(pj_id=pid)
86+
else:
87+
record = ISPyBDB.AutoProcProgram(
88+
processingJobId=pid, processingStartTime=datetime.now()
89+
)
90+
appid = _transport_object.do_update_processing_status(record).get(
91+
"return_value", None
92+
)
93+
if appid is None:
94+
return {"success": False, "requeue": True}
95+
murfey_app = MurfeyDB.AutoProcProgram(id=appid, pj_id=pid)
96+
murfey_db.add(murfey_app)
97+
murfey_db.commit()
98+
murfey_db.close()
99+
return {"success": True}

0 commit comments

Comments
 (0)