Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
- name: Install dependencies
run: |
pip install pytest pytest-cov pytest-mock pytest-datafiles
pip install .[torch]
pip install .[murfey_db,torch]
- name: Install fork of CCP-EM pipeliner
run: |
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ dev = [
"pytest-datafiles",
"pytest-mock",
]
murfey_db = [
"murfey",
]
torch = [
"membrain-seg",
"topaz-em",
Expand Down Expand Up @@ -100,6 +103,7 @@ torch = [
Images = "cryoemservices.services.images:Images"
MembrainSeg = "cryoemservices.services.membrain_seg:MembrainSeg"
MotionCorr = "cryoemservices.services.motioncorr:MotionCorr"
MurfeyDBConnector = "cryoemservices.services.murfey_db_connector:MurfeyDBConnector"
NodeCreator = "cryoemservices.services.node_creator:NodeCreator"
PostProcess = "cryoemservices.services.postprocess:PostProcess"
ProcessRecipe = "cryoemservices.services.process_recipe:ProcessRecipe"
Expand Down
14 changes: 9 additions & 5 deletions src/cryoemservices/services/ispyb_connector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import time
from typing import Callable, Optional

import ispyb.sqlalchemy
import sqlalchemy.orm
Expand All @@ -19,14 +20,13 @@ class EMISPyB(CommonService):
_logger_name = "cryoemservices.services.ispyb_connector"

# ispyb connection details
ispyb = None
_ispyb_sessionmaker = None
_database_session_maker: Optional[Callable] = None

def initializing(self):
"""Subscribe the ISPyB connector queue. Received messages must be
acknowledged. Prepare ISPyB database connection."""
service_config = config_from_file(self._environment["config"])
self._ispyb_sessionmaker = sqlalchemy.orm.sessionmaker(
self._database_session_maker = sqlalchemy.orm.sessionmaker(
bind=sqlalchemy.create_engine(
ispyb.sqlalchemy.url(credentials=service_config.ispyb_credentials),
connect_args={"use_pure": True},
Expand All @@ -41,6 +41,10 @@ def initializing(self):
allow_non_recipe_messages=True,
)

@staticmethod
def get_command(command):
return getattr(ispyb_commands, command, None)

def insert_into_ispyb(self, rw, header, message):
"""Do something with ISPyB."""
if not rw:
Expand Down Expand Up @@ -99,7 +103,7 @@ def parameters(parameter):
self.log.error("Received message is not a valid ISPyB command")
rw.transport.nack(header)
return
command_function = getattr(ispyb_commands, command, None)
command_function = self.get_command(command)
if not command_function:
self.log.error("Received unknown ISPyB command (%s)", command)
rw.transport.nack(header)
Expand All @@ -111,7 +115,7 @@ def parameters(parameter):

self.log.info("Running ISPyB call %s", command)
try:
with self._ispyb_sessionmaker() as session:
with self._database_session_maker() as session:
result = command_function(
message=message,
parameters=parameters,
Expand Down
36 changes: 36 additions & 0 deletions src/cryoemservices/services/murfey_db_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

from murfey.server.murfey_db import get_murfey_db_session
from workflows.recipe import wrap_subscribe

from cryoemservices.services.ispyb_connector import EMISPyB
from cryoemservices.util import ispyb_commands, murfey_db_commands


class MurfeyDBConnector(EMISPyB):
"""
A service that receives information to be written to the Murfey database.
Designed to override the ispyb connector in cases where ispyb is not available
"""

# Logger name
_logger_name = "cryoemservices.services.murfey_db_connector"

def initializing(self):
"""Subscribe the ISPyB connector queue. Received messages must be
acknowledged. Prepare Murfey database connection."""
self._database_session_maker = get_murfey_db_session
self.log.info("ISPyB service ready")
wrap_subscribe(
self._transport,
self._environment["queue"] or "ispyb_connector",
self.insert_into_ispyb,
acknowledgement=True,
allow_non_recipe_messages=True,
)

@staticmethod
def get_command(command):
if getattr(murfey_db_commands, command, None):
return getattr(murfey_db_commands, command, None)
return getattr(ispyb_commands, command, None)
51 changes: 19 additions & 32 deletions src/cryoemservices/util/ispyb_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,33 +171,6 @@ def buffer(message: dict, parameters: Callable, session: sqlalchemy.orm.Session)
return result


def _get_movie_id(
full_path,
data_collection_id,
db_session,
):
logger.info(
f"Looking for Movie ID. Movie name: {full_path} DCID: {data_collection_id}"
)
movie_name = Path(full_path).stem.replace("_motion_corrected", "")
mv_query = db_session.query(models.Movie).filter(
models.Movie.dataCollectionId == data_collection_id,
)
results = mv_query.all()
correct_result = None
if results:
for result in results:
if movie_name in result.movieFullPath:
correct_result = result
if correct_result:
mvid = correct_result.movieId
logger.info(f"Found Movie ID: {mvid}")
return mvid
else:
logger.error(f"Unable to find movie ID for {movie_name}")
return None


def insert_movie(message: dict, parameters: Callable, session: sqlalchemy.orm.Session):
try:
foil_hole_id = (
Expand Down Expand Up @@ -683,11 +656,25 @@ def full_parameters(param):
if full_parameters("movie_id"):
mvid = full_parameters("movie_id")
else:
mvid = _get_movie_id(full_parameters("path"), full_parameters("dcid"), session)

if not mvid:
logger.error("No movie ID for tilt image alignment")
return False
movie_name = Path(full_parameters("path")).stem.replace("_motion_corrected", "")
logger.info(
f"Looking for Movie ID. Movie name: {movie_name} DCID: {full_parameters('dcid')}"
)
results = (
session.query(models.Movie)
.filter(
models.Movie.dataCollectionId == full_parameters("dcid"),
)
.all()
)
mvid = None
for result in results:
if movie_name in result.movieFullPath:
logger.info(f"Found Movie ID: {mvid}")
mvid = result.movieId
if not mvid:
logger.error(f"No movie ID for {movie_name} in tilt image alignment")
return False

try:
values = models.TiltImageAlignment(
Expand Down
160 changes: 160 additions & 0 deletions src/cryoemservices/util/murfey_db_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
from __future__ import annotations

import logging
from pathlib import Path
from typing import Callable

import murfey.util.processing_db as models
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from cryoemservices.util import ispyb_commands

logger = logging.getLogger("cryoemservices.util.murfey_db_commands")
logger.setLevel(logging.INFO)


def buffer(message: dict, parameters: Callable, session: Session):
"""
Override of the buffer command,
which just uses the given value rather than doing a lookup
"""
if isinstance(message.get("buffer_command"), dict):
command = message.get("buffer_command", {}).get("ispyb_command", "")
else:
command = None
if not command:
logger.error(f"Invalid buffer call: no buffer command in {message}")
return False

# Look up the command in this file or in the ispyb_commands file
command_function = globals().get(command) or getattr(ispyb_commands, command, None)
if not command_function:
logger.error(f"Invalid buffer call: unknown command {command} in {message}")
return False

program_id = parameters("program_id")
if not program_id:
logger.error("Invalid buffer call: program_id is undefined")
return False

# Prepare command: Resolve all references
if message.get("buffer_lookup") and not isinstance(message["buffer_lookup"], dict):
logger.error("Invalid buffer call: buffer_lookup is not a dictionary")
return False
for entry in list(message.get("buffer_lookup", [])):
# Copy value into command variables
message["buffer_command"][entry] = message["buffer_lookup"][entry]
del message["buffer_lookup"][entry]

# Run the actual command
result = command_function(
message=message["buffer_command"],
parameters=parameters,
session=session,
)

# If the command did not succeed then propagate failure
if not result or not result.get("success"):
logger.warning("Buffered command failed")
return result

# Finally, propagate result
result["store_result"] = message.get("store_result")
return result


def insert_movie(message: dict, parameters: Callable, session: Session):
"""Do nothing as this will already be present in the murfey database"""
return {"success": True, "return_value": 0}


def insert_cryoem_initial_model(message: dict, parameters: Callable, session: Session):
"""Override of initial model search for if the link table does not exist"""

def full_parameters(param):
return ispyb_commands.parameters_with_replacement(param, message, parameters)

try:
values_im = models.CryoemInitialModel(
resolution=full_parameters("resolution"),
numberOfParticles=full_parameters("number_of_particles"),
particleClassificationId=full_parameters("particle_classification_id"),
)
session.add(values_im)
session.commit()
logger.info(
f"Created CryoEM Initial Model record {values_im.cryoemInitialModelId}"
)
return {"success": True, "return_value": values_im.cryoemInitialModelId}
except SQLAlchemyError as e:
logger.error(
f"Inserting CryoEM Initial Model entry caused exception {e}",
exc_info=True,
)
return False


def insert_tilt_image_alignment(message: dict, parameters: Callable, session: Session):
"""Override of tilt image alignment for murfey movie table"""

def full_parameters(param):
return ispyb_commands.parameters_with_replacement(param, message, parameters)

if full_parameters("movie_id"):
mvid = full_parameters("movie_id")
else:
movie_name = Path(full_parameters("path")).stem.replace("_motion_corrected", "")
logger.info(
f"Looking for Movie ID. Movie name: {movie_name} DCID: {full_parameters('dcid')}"
)
results = (
session.query(models.Movie)
.filter(
models.Movie.data_collection_id == full_parameters("dcid"),
)
.all()
)
mvid = None
for result in results:
if movie_name in result.path:
logger.info(f"Found Movie ID: {mvid}")
mvid = result.murfey_id
if not mvid:
logger.error(f"No movie ID for {movie_name} in tilt image alignment")
return False

try:
values = models.TiltImageAlignment(
movieId=mvid,
tomogramId=full_parameters("tomogram_id"),
defocusU=full_parameters("defocus_u"),
defocusV=full_parameters("defocus_v"),
psdFile=full_parameters("psd_file"),
resolution=full_parameters("resolution"),
fitQuality=full_parameters("fit_quality"),
refinedMagnification=full_parameters("refined_magnification"),
refinedTiltAngle=full_parameters("refined_tilt_angle"),
refinedTiltAxis=full_parameters("refined_tilt_axis"),
residualError=full_parameters("residual_error"),
)
session.add(values)
session.commit()
logger.info(f"Created tilt image alignment record for {values.tomogramId}")
return {"success": True, "return_value": values.tomogramId}
except SQLAlchemyError as e:
logger.error(
f"Inserting Tilt Image Alignment entry caused exception {e}",
exc_info=True,
)
return False


def update_processing_status(message: dict, parameters: Callable, session: Session):
"""Do nothing if AutoProcProgram status table does not exist"""
return {"success": True, "return_value": 0}


def register_processing(message: dict, parameters: Callable, session: Session):
"""Do nothing if AutoProcProgram status table does not exist"""
return {"success": True, "return_value": 0}
Loading
Loading