Skip to content

Commit 7c6548f

Browse files
committed
If there is no ISPyB connection then autogenerate data collection and processing job IDs in the Murfey database
1 parent 60ec9a1 commit 7c6548f

File tree

3 files changed

+134
-122
lines changed

3 files changed

+134
-122
lines changed

src/murfey/server/__init__.py

Lines changed: 95 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -2572,17 +2572,23 @@ def feedback_callback(header: dict, message: dict) -> None:
25722572
).all():
25732573
dcgid = dcg_murfey[0].id
25742574
else:
2575-
record = DataCollectionGroup(
2576-
sessionId=ispyb_session_id,
2577-
experimentType=message["experiment_type"],
2578-
experimentTypeId=message["experiment_type_id"],
2579-
)
2580-
dcgid = _register(record, header)
2581-
murfey_dcg = db.DataCollectionGroup(
2582-
id=dcgid,
2583-
session_id=message["session_id"],
2584-
tag=message.get("tag"),
2585-
)
2575+
if ispyb_session_id is None:
2576+
murfey_dcg = db.DataCollectionGroup(
2577+
session_id=message["session_id"],
2578+
tag=message.get("tag"),
2579+
)
2580+
else:
2581+
record = DataCollectionGroup(
2582+
sessionId=ispyb_session_id,
2583+
experimentType=message["experiment_type"],
2584+
experimentTypeId=message["experiment_type_id"],
2585+
)
2586+
dcgid = _register(record, header)
2587+
murfey_dcg = db.DataCollectionGroup(
2588+
id=dcgid,
2589+
session_id=message["session_id"],
2590+
tag=message.get("tag"),
2591+
)
25862592
murfey_db.add(murfey_dcg)
25872593
murfey_db.commit()
25882594
murfey_db.close()
@@ -2635,60 +2641,66 @@ def feedback_callback(header: dict, message: dict) -> None:
26352641
).all():
26362642
dcid = dc_murfey[0].id
26372643
else:
2638-
record = DataCollection(
2639-
SESSIONID=ispyb_session_id,
2640-
experimenttype=message["experiment_type"],
2641-
imageDirectory=message["image_directory"],
2642-
imageSuffix=message["image_suffix"],
2643-
voltage=message["voltage"],
2644-
dataCollectionGroupId=dcgid,
2645-
pixelSizeOnImage=message["pixel_size"],
2646-
imageSizeX=message["image_size_x"],
2647-
imageSizeY=message["image_size_y"],
2648-
slitGapHorizontal=message.get("slit_width"),
2649-
magnification=message.get("magnification"),
2650-
exposureTime=message.get("exposure_time"),
2651-
totalExposedDose=message.get("total_exposed_dose"),
2652-
c2aperture=message.get("c2aperture"),
2653-
phasePlate=int(message.get("phase_plate", 0)),
2654-
)
2655-
dcid = _register(
2656-
record,
2657-
header,
2658-
tag=(
2659-
message.get("tag")
2660-
if message["experiment_type"] == "tomography"
2661-
else ""
2662-
),
2663-
)
2664-
murfey_dc = db.DataCollection(
2665-
id=dcid,
2666-
tag=message.get("tag"),
2667-
dcg_id=dcgid,
2668-
)
2644+
if ispyb_session_id is None:
2645+
murfey_dc = db.DataCollection(
2646+
tag=message.get("tag"),
2647+
dcg_id=dcgid,
2648+
)
2649+
else:
2650+
record = DataCollection(
2651+
SESSIONID=ispyb_session_id,
2652+
experimenttype=message["experiment_type"],
2653+
imageDirectory=message["image_directory"],
2654+
imageSuffix=message["image_suffix"],
2655+
voltage=message["voltage"],
2656+
dataCollectionGroupId=dcgid,
2657+
pixelSizeOnImage=message["pixel_size"],
2658+
imageSizeX=message["image_size_x"],
2659+
imageSizeY=message["image_size_y"],
2660+
slitGapHorizontal=message.get("slit_width"),
2661+
magnification=message.get("magnification"),
2662+
exposureTime=message.get("exposure_time"),
2663+
totalExposedDose=message.get("total_exposed_dose"),
2664+
c2aperture=message.get("c2aperture"),
2665+
phasePlate=int(message.get("phase_plate", 0)),
2666+
)
2667+
dcid = _register(
2668+
record,
2669+
header,
2670+
tag=(
2671+
message.get("tag")
2672+
if message["experiment_type"] == "tomography"
2673+
else ""
2674+
),
2675+
)
2676+
murfey_dc = db.DataCollection(
2677+
id=dcid,
2678+
tag=message.get("tag"),
2679+
dcg_id=dcgid,
2680+
)
26692681
murfey_db.add(murfey_dc)
26702682
murfey_db.commit()
2683+
dcid = murfey_dc.id
26712684
murfey_db.close()
26722685
if dcid is None and _transport_object:
26732686
_transport_object.transport.nack(header, requeue=True)
26742687
return None
2675-
if global_state.get("data_collection_ids") and isinstance(
2676-
global_state["data_collection_ids"], dict
2677-
):
2678-
global_state["data_collection_ids"] = {
2679-
**global_state["data_collection_ids"],
2680-
message.get("tag"): dcid,
2681-
}
2682-
else:
2683-
global_state["data_collection_ids"] = {message.get("tag"): dcid}
26842688
if _transport_object:
26852689
_transport_object.transport.ack(header)
26862690
return None
26872691
elif message["register"] == "processing_job":
26882692
logger.info("registering processing job")
26892693
assert isinstance(global_state["data_collection_ids"], dict)
2690-
_dcid = global_state["data_collection_ids"].get(message["tag"])
2691-
if _dcid is None:
2694+
dc = murfey_db.exec(
2695+
select(db.DataCollection, db.DataCollectionGroup)
2696+
.where(db.DataCollection.dcg_id == db.DataCollectionGroup.id)
2697+
.where(db.DataCollectionGroup.session_id == murfey_session_id)
2698+
.where(db.DataCollectionGroup.tag == message["source"])
2699+
.where(db.DataCollection.tag == message["tag"])
2700+
).all()
2701+
if dc:
2702+
_dcid = dc[0][0].id
2703+
else:
26922704
logger.warning(f"No data collection ID found for {message['tag']}")
26932705
if _transport_object:
26942706
_transport_object.transport.nack(header, requeue=True)
@@ -2700,38 +2712,33 @@ def feedback_callback(header: dict, message: dict) -> None:
27002712
).all():
27012713
pid = pj_murfey[0].id
27022714
else:
2703-
record = ProcessingJob(dataCollectionId=_dcid, recipe=message["recipe"])
2704-
run_parameters = message.get("parameters", {})
2705-
assert isinstance(run_parameters, dict)
2706-
if message.get("job_parameters"):
2707-
job_parameters = [
2708-
ProcessingJobParameter(parameterKey=k, parameterValue=v)
2709-
for k, v in message["job_parameters"].items()
2710-
]
2711-
pid = _register(ExtendedRecord(record, job_parameters), header)
2715+
if murfey.server.ispyb.Session() is None:
2716+
murfey_pj = db.ProcessingJob(recipe=message["recipe"], dc_id=_dcid)
27122717
else:
2713-
pid = _register(record, header)
2714-
murfey_pj = db.ProcessingJob(
2715-
id=pid, recipe=message["recipe"], dc_id=_dcid
2716-
)
2718+
record = ProcessingJob(
2719+
dataCollectionId=_dcid, recipe=message["recipe"]
2720+
)
2721+
run_parameters = message.get("parameters", {})
2722+
assert isinstance(run_parameters, dict)
2723+
if message.get("job_parameters"):
2724+
job_parameters = [
2725+
ProcessingJobParameter(parameterKey=k, parameterValue=v)
2726+
for k, v in message["job_parameters"].items()
2727+
]
2728+
pid = _register(ExtendedRecord(record, job_parameters), header)
2729+
else:
2730+
pid = _register(record, header)
2731+
murfey_pj = db.ProcessingJob(
2732+
id=pid, recipe=message["recipe"], dc_id=_dcid
2733+
)
27172734
murfey_db.add(murfey_pj)
27182735
murfey_db.commit()
2736+
pid = murfey_pj.id
27192737
murfey_db.close()
27202738
if pid is None and _transport_object:
27212739
_transport_object.transport.nack(header, requeue=True)
27222740
return None
27232741
prom.preprocessed_movies.labels(processing_job=pid)
2724-
if global_state.get("processing_job_ids"):
2725-
global_state["processing_job_ids"] = {
2726-
**global_state["processing_job_ids"], # type: ignore
2727-
message.get("tag"): {
2728-
**global_state["processing_job_ids"].get(message.get("tag"), {}), # type: ignore
2729-
message["recipe"]: pid,
2730-
},
2731-
}
2732-
else:
2733-
prids = {message["tag"]: {message["recipe"]: pid}}
2734-
global_state["processing_job_ids"] = prids
27352742
if message.get("job_parameters"):
27362743
if _transport_object:
27372744
_transport_object.transport.ack(header)
@@ -2741,30 +2748,20 @@ def feedback_callback(header: dict, message: dict) -> None:
27412748
).all():
27422749
appid = app_murfey[0].id
27432750
else:
2744-
record = AutoProcProgram(
2745-
processingJobId=pid, processingStartTime=datetime.now()
2746-
)
2747-
appid = _register(record, header)
2748-
if appid is None and _transport_object:
2749-
_transport_object.transport.nack(header, requeue=True)
2750-
return None
2751-
murfey_app = db.AutoProcProgram(id=appid, pj_id=pid)
2751+
if murfey.server.ispyb.Session() is None:
2752+
murfey_app = db.AutoProcProgram(pj_id=pid)
2753+
else:
2754+
record = AutoProcProgram(
2755+
processingJobId=pid, processingStartTime=datetime.now()
2756+
)
2757+
appid = _register(record, header)
2758+
if appid is None and _transport_object:
2759+
_transport_object.transport.nack(header, requeue=True)
2760+
return None
2761+
murfey_app = db.AutoProcProgram(id=appid, pj_id=pid)
27522762
murfey_db.add(murfey_app)
27532763
murfey_db.commit()
27542764
murfey_db.close()
2755-
if global_state.get("autoproc_program_ids"):
2756-
assert isinstance(global_state["autoproc_program_ids"], dict)
2757-
global_state["autoproc_program_ids"] = {
2758-
**global_state["autoproc_program_ids"],
2759-
message.get("tag"): {
2760-
**global_state["autoproc_program_ids"].get(message.get("tag"), {}), # type: ignore
2761-
message["recipe"]: appid,
2762-
},
2763-
}
2764-
else:
2765-
global_state["autoproc_program_ids"] = {
2766-
message["tag"]: {message["recipe"]: appid}
2767-
}
27682765
if _transport_object:
27692766
_transport_object.transport.ack(header)
27702767
return None

src/murfey/server/demo_api.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -843,26 +843,31 @@ def register_tilt(visit_name: str, client_id: int, tilt_info: TiltInfo, db=murfe
843843
db.commit()
844844

845845

846+
# @router.get("/instruments/{instrument_name}/visits_raw", response_model=List[Visit])
847+
# def get_current_visits(instrument_name: str):
848+
# return [
849+
# Visit(
850+
# start=datetime.datetime.now(),
851+
# end=datetime.datetime.now() + datetime.timedelta(days=1),
852+
# session_id=1,
853+
# name="cm31111-2",
854+
# beamline="m12",
855+
# proposal_title="Nothing of importance",
856+
# ),
857+
# Visit(
858+
# start=datetime.datetime.now(),
859+
# end=datetime.datetime.now() + datetime.timedelta(days=1),
860+
# session_id=1,
861+
# name="cm31111-3",
862+
# beamline="m12",
863+
# proposal_title="Nothing of importance",
864+
# ),
865+
# ]
866+
867+
846868
@router.get("/instruments/{instrument_name}/visits_raw", response_model=List[Visit])
847-
def get_current_visits(instrument_name: str):
848-
return [
849-
Visit(
850-
start=datetime.datetime.now(),
851-
end=datetime.datetime.now() + datetime.timedelta(days=1),
852-
session_id=1,
853-
name="cm31111-2",
854-
beamline="m12",
855-
proposal_title="Nothing of importance",
856-
),
857-
Visit(
858-
start=datetime.datetime.now(),
859-
end=datetime.datetime.now() + datetime.timedelta(days=1),
860-
session_id=1,
861-
name="cm31111-3",
862-
beamline="m12",
863-
proposal_title="Nothing of importance",
864-
),
865-
]
869+
def get_current_visits(instrument_name: str, db=murfey.server.ispyb.DB):
870+
return murfey.server.ispyb.get_all_ongoing_visits(instrument_name, db)
866871

867872

868873
@router.get("/visits/{visit_name}")

src/murfey/server/ispyb.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import datetime
44
import logging
5+
import os
56
from typing import Callable, List, Optional
67

78
import ispyb
@@ -36,7 +37,7 @@
3637
bind=sqlalchemy.create_engine(url(), connect_args={"use_pure": True})
3738
)
3839
except AttributeError:
39-
Session = None
40+
Session = lambda: None
4041

4142

4243
def _send_using_new_connection(transport_type: str, queue: str, message: dict) -> None:
@@ -56,7 +57,7 @@ def __init__(self, transport_type):
5657
self.transport = workflows.transport.lookup(transport_type)()
5758
self.transport.connect()
5859
self.feedback_queue = ""
59-
self.ispyb = ispyb.open()
60+
self.ispyb = ispyb.open() if os.getenv("ISYPB_CREDENTIALS") else None
6061
self._connection_callback: Callable | None = None
6162

6263
def reconnect(self):
@@ -273,6 +274,9 @@ def do_buffer_lookup(self, app_id: int, uuid: int) -> Optional[int]:
273274

274275
def _get_session() -> sqlalchemy.orm.Session:
275276
db = Session()
277+
if db is None:
278+
yield None
279+
return
276280
try:
277281
yield db
278282
finally:
@@ -288,8 +292,10 @@ def get_session_id(
288292
proposal_code: str,
289293
proposal_number: str,
290294
visit_number: str,
291-
db: sqlalchemy.orm.Session,
292-
) -> int:
295+
db: sqlalchemy.orm.Session | None,
296+
) -> int | None:
297+
if db is None:
298+
return None
293299
query = (
294300
db.query(BLSession)
295301
.join(Proposal)
@@ -347,7 +353,11 @@ def get_sub_samples_from_visit(visit: str, db: sqlalchemy.orm.Session) -> List[S
347353
return res
348354

349355

350-
def get_all_ongoing_visits(microscope: str, db: sqlalchemy.orm.Session) -> list[Visit]:
356+
def get_all_ongoing_visits(
357+
microscope: str, db: sqlalchemy.orm.Session | None
358+
) -> list[Visit]:
359+
if db is None:
360+
return []
351361
query = (
352362
db.query(BLSession)
353363
.join(Proposal)

0 commit comments

Comments
 (0)