Skip to content

Commit 95512ff

Browse files
committed
'ExtendedRecord' class and '_register' set of functions can now be removed
1 parent 4591e76 commit 95512ff

File tree

1 file changed

+1
-72
lines changed

1 file changed

+1
-72
lines changed

src/murfey/server/feedback.py

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,14 @@
1212
import subprocess
1313
import time
1414
from datetime import datetime
15-
from functools import partial, singledispatch
15+
from functools import partial
1616
from importlib.metadata import EntryPoint # For type hinting only
1717
from pathlib import Path
1818
from typing import Dict, List, NamedTuple, Tuple
1919

2020
import mrcfile
2121
import numpy as np
2222
from backports.entry_points_selectable import entry_points
23-
from ispyb.sqlalchemy._auto_db_schema import (
24-
AutoProcProgram,
25-
Base,
26-
DataCollection,
27-
DataCollectionGroup,
28-
ProcessingJob,
29-
)
3023
from sqlalchemy import func
3124
from sqlalchemy.exc import (
3225
InvalidRequestError,
@@ -62,11 +55,6 @@
6255
murfey_db = None
6356

6457

65-
class ExtendedRecord(NamedTuple):
66-
record: Base # type: ignore
67-
record_params: List[Base] # type: ignore
68-
69-
7058
class JobIDs(NamedTuple):
7159
dcgid: int
7260
dcid: int
@@ -2264,65 +2252,6 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
22642252
return None
22652253

22662254

2267-
@singledispatch
2268-
def _register(record, header: dict, **kwargs):
2269-
raise NotImplementedError(f"Not method to register {record} or type {type(record)}")
2270-
2271-
2272-
@_register.register
2273-
def _(record: Base, header: dict, **kwargs): # type: ignore
2274-
if not murfey.server._transport_object:
2275-
logger.error(
2276-
f"No transport object found when processing record {record}. Message header: {header}"
2277-
)
2278-
return None
2279-
try:
2280-
if isinstance(record, DataCollection):
2281-
return murfey.server._transport_object.do_insert_data_collection(
2282-
record, **kwargs
2283-
)["return_value"]
2284-
if isinstance(record, DataCollectionGroup):
2285-
return murfey.server._transport_object.do_insert_data_collection_group(
2286-
record
2287-
)["return_value"]
2288-
if isinstance(record, ProcessingJob):
2289-
return murfey.server._transport_object.do_create_ispyb_job(record)[
2290-
"return_value"
2291-
]
2292-
if isinstance(record, AutoProcProgram):
2293-
return murfey.server._transport_object.do_update_processing_status(record)[
2294-
"return_value"
2295-
]
2296-
# session = Session()
2297-
# session.add(record)
2298-
# session.commit()
2299-
# murfey.server._transport_object.transport.ack(header, requeue=False)
2300-
return getattr(record, record.__table__.primary_key.columns[0].name)
2301-
2302-
except SQLAlchemyError as e:
2303-
logger.error(f"Murfey failed to insert ISPyB record {record}", e, exc_info=True)
2304-
# murfey.server._transport_object.transport.nack(header)
2305-
return None
2306-
except AttributeError as e:
2307-
logger.error(
2308-
f"Murfey could not find primary key when inserting record {record}",
2309-
e,
2310-
exc_info=True,
2311-
)
2312-
return None
2313-
2314-
2315-
@_register.register
2316-
def _(extended_record: ExtendedRecord, header: dict, **kwargs):
2317-
if not murfey.server._transport_object:
2318-
raise ValueError(
2319-
"Transport object should not be None if a database record is being updated"
2320-
)
2321-
return murfey.server._transport_object.do_create_ispyb_job(
2322-
extended_record.record, params=extended_record.record_params
2323-
)["return_value"]
2324-
2325-
23262255
def feedback_listen():
23272256
if murfey.server._transport_object:
23282257
if not murfey.server._transport_object.feedback_queue:

0 commit comments

Comments
 (0)