diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e5e00abd..e7df0d74 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -44,7 +44,7 @@ jobs: - name: Install dependencies run: | pip install pytest pytest-cov pytest-mock pytest-datafiles - pip install .[torch] + pip install .[murfey_db,torch] - name: Install fork of CCP-EM pipeliner run: | diff --git a/pyproject.toml b/pyproject.toml index f6378d39..7cb39ca2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,9 @@ dev = [ "pytest-datafiles", "pytest-mock", ] +murfey_db = [ + "murfey", +] torch = [ "membrain-seg", "topaz-em", @@ -100,6 +103,7 @@ torch = [ Images = "cryoemservices.services.images:Images" MembrainSeg = "cryoemservices.services.membrain_seg:MembrainSeg" MotionCorr = "cryoemservices.services.motioncorr:MotionCorr" + MurfeyDBConnector = "cryoemservices.services.murfey_db_connector:MurfeyDBConnector" NodeCreator = "cryoemservices.services.node_creator:NodeCreator" PostProcess = "cryoemservices.services.postprocess:PostProcess" ProcessRecipe = "cryoemservices.services.process_recipe:ProcessRecipe" diff --git a/src/cryoemservices/services/ispyb_connector.py b/src/cryoemservices/services/ispyb_connector.py index 5130a3ad..54c78cf4 100644 --- a/src/cryoemservices/services/ispyb_connector.py +++ b/src/cryoemservices/services/ispyb_connector.py @@ -1,6 +1,7 @@ from __future__ import annotations import time +from typing import Callable, Optional import ispyb.sqlalchemy import sqlalchemy.orm @@ -19,14 +20,13 @@ class EMISPyB(CommonService): _logger_name = "cryoemservices.services.ispyb_connector" # ispyb connection details - ispyb = None - _ispyb_sessionmaker = None + _database_session_maker: Optional[Callable] = None def initializing(self): """Subscribe the ISPyB connector queue. Received messages must be acknowledged. Prepare ISPyB database connection.""" service_config = config_from_file(self._environment["config"]) - self._ispyb_sessionmaker = sqlalchemy.orm.sessionmaker( + self._database_session_maker = sqlalchemy.orm.sessionmaker( bind=sqlalchemy.create_engine( ispyb.sqlalchemy.url(credentials=service_config.ispyb_credentials), connect_args={"use_pure": True}, @@ -41,6 +41,10 @@ def initializing(self): allow_non_recipe_messages=True, ) + @staticmethod + def get_command(command): + return getattr(ispyb_commands, command, None) + def insert_into_ispyb(self, rw, header, message): """Do something with ISPyB.""" if not rw: @@ -99,7 +103,7 @@ def parameters(parameter): self.log.error("Received message is not a valid ISPyB command") rw.transport.nack(header) return - command_function = getattr(ispyb_commands, command, None) + command_function = self.get_command(command) if not command_function: self.log.error("Received unknown ISPyB command (%s)", command) rw.transport.nack(header) @@ -111,7 +115,7 @@ def parameters(parameter): self.log.info("Running ISPyB call %s", command) try: - with self._ispyb_sessionmaker() as session: + with self._database_session_maker() as session: result = command_function( message=message, parameters=parameters, diff --git a/src/cryoemservices/services/murfey_db_connector.py b/src/cryoemservices/services/murfey_db_connector.py new file mode 100644 index 00000000..8ae683ed --- /dev/null +++ b/src/cryoemservices/services/murfey_db_connector.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from murfey.server.murfey_db import get_murfey_db_session +from workflows.recipe import wrap_subscribe + +from cryoemservices.services.ispyb_connector import EMISPyB +from cryoemservices.util import ispyb_commands, murfey_db_commands + + +class MurfeyDBConnector(EMISPyB): + """ + A service that receives information to be written to the Murfey database. + Designed to override the ispyb connector in cases where ispyb is not available + """ + + # Logger name + _logger_name = "cryoemservices.services.murfey_db_connector" + + def initializing(self): + """Subscribe the ISPyB connector queue. Received messages must be + acknowledged. Prepare Murfey database connection.""" + self._database_session_maker = get_murfey_db_session + self.log.info("ISPyB service ready") + wrap_subscribe( + self._transport, + self._environment["queue"] or "ispyb_connector", + self.insert_into_ispyb, + acknowledgement=True, + allow_non_recipe_messages=True, + ) + + @staticmethod + def get_command(command): + if getattr(murfey_db_commands, command, None): + return getattr(murfey_db_commands, command, None) + return getattr(ispyb_commands, command, None) diff --git a/src/cryoemservices/util/ispyb_commands.py b/src/cryoemservices/util/ispyb_commands.py index 0f25a370..963b4e30 100644 --- a/src/cryoemservices/util/ispyb_commands.py +++ b/src/cryoemservices/util/ispyb_commands.py @@ -171,33 +171,6 @@ def buffer(message: dict, parameters: Callable, session: sqlalchemy.orm.Session) return result -def _get_movie_id( - full_path, - data_collection_id, - db_session, -): - logger.info( - f"Looking for Movie ID. Movie name: {full_path} DCID: {data_collection_id}" - ) - movie_name = Path(full_path).stem.replace("_motion_corrected", "") - mv_query = db_session.query(models.Movie).filter( - models.Movie.dataCollectionId == data_collection_id, - ) - results = mv_query.all() - correct_result = None - if results: - for result in results: - if movie_name in result.movieFullPath: - correct_result = result - if correct_result: - mvid = correct_result.movieId - logger.info(f"Found Movie ID: {mvid}") - return mvid - else: - logger.error(f"Unable to find movie ID for {movie_name}") - return None - - def insert_movie(message: dict, parameters: Callable, session: sqlalchemy.orm.Session): try: foil_hole_id = ( @@ -683,11 +656,25 @@ def full_parameters(param): if full_parameters("movie_id"): mvid = full_parameters("movie_id") else: - mvid = _get_movie_id(full_parameters("path"), full_parameters("dcid"), session) - - if not mvid: - logger.error("No movie ID for tilt image alignment") - return False + movie_name = Path(full_parameters("path")).stem.replace("_motion_corrected", "") + logger.info( + f"Looking for Movie ID. Movie name: {movie_name} DCID: {full_parameters('dcid')}" + ) + results = ( + session.query(models.Movie) + .filter( + models.Movie.dataCollectionId == full_parameters("dcid"), + ) + .all() + ) + mvid = None + for result in results: + if movie_name in result.movieFullPath: + logger.info(f"Found Movie ID: {mvid}") + mvid = result.movieId + if not mvid: + logger.error(f"No movie ID for {movie_name} in tilt image alignment") + return False try: values = models.TiltImageAlignment( diff --git a/src/cryoemservices/util/murfey_db_commands.py b/src/cryoemservices/util/murfey_db_commands.py new file mode 100644 index 00000000..0e871eb3 --- /dev/null +++ b/src/cryoemservices/util/murfey_db_commands.py @@ -0,0 +1,160 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Callable + +import murfey.util.processing_db as models +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.orm import Session + +from cryoemservices.util import ispyb_commands + +logger = logging.getLogger("cryoemservices.util.murfey_db_commands") +logger.setLevel(logging.INFO) + + +def buffer(message: dict, parameters: Callable, session: Session): + """ + Override of the buffer command, + which just uses the given value rather than doing a lookup + """ + if isinstance(message.get("buffer_command"), dict): + command = message.get("buffer_command", {}).get("ispyb_command", "") + else: + command = None + if not command: + logger.error(f"Invalid buffer call: no buffer command in {message}") + return False + + # Look up the command in this file or in the ispyb_commands file + command_function = globals().get(command) or getattr(ispyb_commands, command, None) + if not command_function: + logger.error(f"Invalid buffer call: unknown command {command} in {message}") + return False + + program_id = parameters("program_id") + if not program_id: + logger.error("Invalid buffer call: program_id is undefined") + return False + + # Prepare command: Resolve all references + if message.get("buffer_lookup") and not isinstance(message["buffer_lookup"], dict): + logger.error("Invalid buffer call: buffer_lookup is not a dictionary") + return False + for entry in list(message.get("buffer_lookup", [])): + # Copy value into command variables + message["buffer_command"][entry] = message["buffer_lookup"][entry] + del message["buffer_lookup"][entry] + + # Run the actual command + result = command_function( + message=message["buffer_command"], + parameters=parameters, + session=session, + ) + + # If the command did not succeed then propagate failure + if not result or not result.get("success"): + logger.warning("Buffered command failed") + return result + + # Finally, propagate result + result["store_result"] = message.get("store_result") + return result + + +def insert_movie(message: dict, parameters: Callable, session: Session): + """Do nothing as this will already be present in the murfey database""" + return {"success": True, "return_value": 0} + + +def insert_cryoem_initial_model(message: dict, parameters: Callable, session: Session): + """Override of initial model search for if the link table does not exist""" + + def full_parameters(param): + return ispyb_commands.parameters_with_replacement(param, message, parameters) + + try: + values_im = models.CryoemInitialModel( + resolution=full_parameters("resolution"), + numberOfParticles=full_parameters("number_of_particles"), + particleClassificationId=full_parameters("particle_classification_id"), + ) + session.add(values_im) + session.commit() + logger.info( + f"Created CryoEM Initial Model record {values_im.cryoemInitialModelId}" + ) + return {"success": True, "return_value": values_im.cryoemInitialModelId} + except SQLAlchemyError as e: + logger.error( + f"Inserting CryoEM Initial Model entry caused exception {e}", + exc_info=True, + ) + return False + + +def insert_tilt_image_alignment(message: dict, parameters: Callable, session: Session): + """Override of tilt image alignment for murfey movie table""" + + def full_parameters(param): + return ispyb_commands.parameters_with_replacement(param, message, parameters) + + if full_parameters("movie_id"): + mvid = full_parameters("movie_id") + else: + movie_name = Path(full_parameters("path")).stem.replace("_motion_corrected", "") + logger.info( + f"Looking for Movie ID. Movie name: {movie_name} DCID: {full_parameters('dcid')}" + ) + results = ( + session.query(models.Movie) + .filter( + models.Movie.data_collection_id == full_parameters("dcid"), + ) + .all() + ) + mvid = None + for result in results: + if movie_name in result.path: + logger.info(f"Found Movie ID: {mvid}") + mvid = result.murfey_id + if not mvid: + logger.error(f"No movie ID for {movie_name} in tilt image alignment") + return False + + try: + values = models.TiltImageAlignment( + movieId=mvid, + tomogramId=full_parameters("tomogram_id"), + defocusU=full_parameters("defocus_u"), + defocusV=full_parameters("defocus_v"), + psdFile=full_parameters("psd_file"), + resolution=full_parameters("resolution"), + fitQuality=full_parameters("fit_quality"), + refinedMagnification=full_parameters("refined_magnification"), + refinedTiltAngle=full_parameters("refined_tilt_angle"), + refinedTiltAxis=full_parameters("refined_tilt_axis"), + residualError=full_parameters("residual_error"), + ) + session.add(values) + session.commit() + logger.info(f"Created tilt image alignment record for {values.tomogramId}") + return {"success": True, "return_value": values.tomogramId} + except SQLAlchemyError as e: + logger.error( + f"Inserting Tilt Image Alignment entry caused exception {e}", + exc_info=True, + ) + return False + + +def update_processing_status(message: dict, parameters: Callable, session: Session): + """Do nothing if AutoProcProgram status table does not exist""" + return {"success": True, "return_value": 0} + + +def register_processing(message: dict, parameters: Callable, session: Session): + """Do nothing if AutoProcProgram status table does not exist""" + return {"success": True, "return_value": 0} diff --git a/tests/services/test_murfey_db_service.py b/tests/services/test_murfey_db_service.py new file mode 100644 index 00000000..b1c94905 --- /dev/null +++ b/tests/services/test_murfey_db_service.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +from unittest import mock + +import pytest +from workflows.transport.offline_transport import OfflineTransport + +from cryoemservices.services import murfey_db_connector + + +@pytest.fixture +def offline_transport(mocker): + transport = OfflineTransport() + mocker.spy(transport, "send") + return transport + + +@mock.patch("cryoemservices.services.ispyb_connector.MockRW") +@mock.patch("cryoemservices.services.murfey_db_connector.get_murfey_db_session") +@mock.patch( + "cryoemservices.services.murfey_db_connector.murfey_db_commands.insert_movie" +) +def test_murfey_db_service_override( + mock_command, mock_murfey_db_session, mock_rw, offline_transport, tmp_path +): + """ + Send a test message to the murfey database connector service + for a function with a murfey-specific version + """ + header = { + "message-id": mock.sentinel, + "subscription": mock.sentinel, + } + ispyb_test_message = { + "ispyb_command": "insert_movie", + } + mock_command.return_value = {"success": True, "return_value": "dummy_result"} + + # Set up the mock service and call it + service = murfey_db_connector.MurfeyDBConnector( + environment={"config": f"{tmp_path}/config.yaml", "queue": ""}, + transport=offline_transport, + ) + service.initializing() + service.insert_into_ispyb(None, header=header, message=ispyb_test_message) + + mock_murfey_db_session.assert_called() + mock_murfey_db_session().__enter__.assert_called() + + mock_command.assert_called_with( + message={ + "expiry_time": mock.ANY, + "ispyb_command": "insert_movie", + }, + parameters=mock.ANY, + session=mock_murfey_db_session().__enter__(), + ) + + # Check that the correct messages were sent + mock_rw.assert_called_with(offline_transport) + mock_rw().set_default_channel.assert_called_with("output") + mock_rw().send.assert_called_with({"result": "dummy_result"}) + + +@mock.patch("cryoemservices.services.ispyb_connector.MockRW") +@mock.patch("cryoemservices.services.murfey_db_connector.get_murfey_db_session") +@mock.patch( + "cryoemservices.services.murfey_db_connector.ispyb_commands.insert_motion_correction" +) +def test_murfey_db_service_use_ispyb_version( + mock_command, mock_murfey_db_session, mock_rw, offline_transport, tmp_path +): + """ + Send a test message to the murfey database connector service + for a function that is not overridden for murfey + """ + header = { + "message-id": mock.sentinel, + "subscription": mock.sentinel, + } + ispyb_test_message = { + "ispyb_command": "insert_motion_correction", + } + mock_command.return_value = {"success": True, "return_value": "dummy_result"} + + # Set up the mock service and call it + service = murfey_db_connector.MurfeyDBConnector( + environment={"config": f"{tmp_path}/config.yaml", "queue": ""}, + transport=offline_transport, + ) + service.initializing() + service.insert_into_ispyb(None, header=header, message=ispyb_test_message) + + mock_murfey_db_session.assert_called() + mock_murfey_db_session().__enter__.assert_called() + + mock_command.assert_called_with( + message={ + "expiry_time": mock.ANY, + "ispyb_command": "insert_motion_correction", + }, + parameters=mock.ANY, + session=mock_murfey_db_session().__enter__(), + ) + + # Check that the correct messages were sent + mock_rw.assert_called_with(offline_transport) + mock_rw().set_default_channel.assert_called_with("output") + mock_rw().send.assert_called_with({"result": "dummy_result"}) diff --git a/tests/util/test_murfey_db_commands.py b/tests/util/test_murfey_db_commands.py new file mode 100644 index 00000000..46378c06 --- /dev/null +++ b/tests/util/test_murfey_db_commands.py @@ -0,0 +1,238 @@ +from unittest import mock + +import pytest + +from cryoemservices.util import murfey_db_commands + +bad_buffer_commands = [ + {}, + {"buffer_command": "not a dict"}, + {"buffer_command": {"ispyb_command": None}}, + {"buffer_command": {"ispyb_command": "non_existant"}}, + { + "buffer_command": { + "ispyb_command": "insert_movie", + }, + "buffer_lookup": "not a dict", + }, +] + + +@pytest.mark.parametrize("commands", bad_buffer_commands) +def test_buffer_bad_commands(commands): + def ispyb_parameters(p): + return p + + assert ( + murfey_db_commands.buffer(commands, ispyb_parameters, mock.MagicMock()) is False + ) + + +@mock.patch( + "cryoemservices.util.murfey_db_commands.ispyb_commands.insert_particle_classification" +) +def test_buffer(mock_insert_class, tmp_path): + """ + Test that multipart message calls run reinjection. + Use a 3D classification example for this + """ + + def mock_parameters(p): + params = {"program_id": 1} + return params.get(p) + + ispyb_test_message = { + "buffer_command": {"ispyb_command": "insert_particle_classification"}, + "buffer_lookup": {"particle_classification_group_id": 5}, + "buffer_store": 10, + "class_distribution": "0.4", + "class_image_full_path": ("/path/to/Class3D/job015/run_it025_class001.mrc"), + "class_number": 1, + "estimated_resolution": 12.2, + "ispyb_command": "buffer", + "overall_fourier_completeness": 1.0, + "particles_per_class": 20000.0, + "rotation_accuracy": "30.3", + "translation_accuracy": "33.3", + "store_result": "value_to_store", + } + + # Mock up the individual insert command + mock_insert_class.return_value = {"success": True, "return_value": "dummy_class"} + mock_db_session = mock.MagicMock() + + result = murfey_db_commands.buffer( + ispyb_test_message, mock_parameters, mock_db_session + ) + + mock_insert_class.assert_called_once_with( + message={ + "ispyb_command": "insert_particle_classification", + "particle_classification_group_id": 5, + }, + parameters=mock_parameters, + session=mock_db_session, + ) + assert result["success"] + assert result["return_value"] == "dummy_class" + assert result["store_result"] == "value_to_store" + + +def test_insert_movie(): + def mock_parameters(p): + return {}.get(p) + + mock_session = mock.MagicMock() + return_value = murfey_db_commands.insert_movie({}, mock_parameters, mock_session) + assert return_value.get("success") + assert return_value["return_value"] == 0 + + +@mock.patch("cryoemservices.util.murfey_db_commands.models") +def test_insert_initial_model(mock_models): + def mock_model_parameters(p): + model_parameters = { + "cryoem_initial_model_id": "None", + "particle_classification_id": 401, + "resolution": 15.1, + "number_of_particles": 21000, + } + return model_parameters[p] + + mock_session = mock.MagicMock() + return_value = murfey_db_commands.insert_cryoem_initial_model( + {}, mock_model_parameters, mock_session + ) + assert return_value.get("success") + assert return_value["return_value"] + + mock_models.CryoemInitialModel.assert_called_with( + resolution=15.1, + numberOfParticles=21000, + particleClassificationId=401, + ) + mock_session.add.assert_called_once() + mock_session.commit.assert_called_once() + + +@mock.patch("cryoemservices.util.murfey_db_commands.models") +def test_insert_tilts_has_movie(mock_models): + def mock_tilt_parameters(p): + tilt_parameters = { + "dcid": 10, + "movie_id": 1, + "tomogram_id": 801, + "defocus_u": 20000, + "defocus_v": 25000, + "psd_file": "/path/to/psd/file", + "resolution": 10.4, + "fit_quality": 0.15, + "refined_magnification": 18000, + "refined_tilt_angle": -12, + "refined_tilt_axis": 83.6, + "residual_error": 0.6, + } + return tilt_parameters[p] + + mock_session = mock.MagicMock() + return_value = murfey_db_commands.insert_tilt_image_alignment( + {}, mock_tilt_parameters, mock_session + ) + assert return_value.get("success") + assert return_value["return_value"] + + mock_models.TiltImageAlignment.assert_called_with( + movieId=1, + tomogramId=801, + defocusU=20000, + defocusV=25000, + psdFile="/path/to/psd/file", + resolution=10.4, + fitQuality=0.15, + refinedMagnification=18000, + refinedTiltAngle=-12, + refinedTiltAxis=83.6, + residualError=0.6, + ) + mock_session.add.assert_called() + mock_session.commit.assert_called() + + +@mock.patch("cryoemservices.util.murfey_db_commands.models") +def test_insert_tilts_without_movie(mock_models): + def mock_tilt_parameters(p): + tilt_parameters = { + "dcid": 10, + "movie_id": None, + "path": "/path/to/test_movie_motion_corrected.mrc", + "tomogram_id": 801, + "defocus_u": 20000, + "defocus_v": 25000, + "psd_file": "/path/to/psd/file", + "resolution": 10.4, + "fit_quality": 0.15, + "refined_magnification": 18000, + "refined_tilt_angle": -12, + "refined_tilt_axis": 83.6, + "residual_error": 0.6, + } + return tilt_parameters[p] + + class MockMovieParameters: + data_collection_id = 10 + path = "/original/path/test_movie.mrc" + murfey_id = 5 + + mock_session = mock.MagicMock() + mock_session.query().filter().all.return_value = [MockMovieParameters] + + return_value = murfey_db_commands.insert_tilt_image_alignment( + {}, mock_tilt_parameters, mock_session + ) + assert return_value.get("success") + assert return_value["return_value"] + + # Check the movie lookup + assert mock_session.query.call_count == 2 + assert mock_session.query().filter.call_count == 2 + assert mock_session.query().filter().all.call_count == 1 + + mock_models.TiltImageAlignment.assert_called_with( + movieId=5, + tomogramId=801, + defocusU=20000, + defocusV=25000, + psdFile="/path/to/psd/file", + resolution=10.4, + fitQuality=0.15, + refinedMagnification=18000, + refinedTiltAngle=-12, + refinedTiltAxis=83.6, + residualError=0.6, + ) + mock_session.add.assert_called() + mock_session.commit.assert_called() + + +def test_update_processing_status(): + def mock_parameters(p): + return {}.get(p) + + mock_session = mock.MagicMock() + return_value = murfey_db_commands.update_processing_status( + {}, mock_parameters, mock_session + ) + assert return_value.get("success") + assert return_value["return_value"] == 0 + + +def test_register_processing(): + def mock_parameters(p): + return {}.get(p) + + mock_session = mock.MagicMock() + return_value = murfey_db_commands.register_processing( + {}, mock_parameters, mock_session + ) + assert return_value.get("success") + assert return_value["return_value"] == 0