|
11 | 11 | from importlib.resources import files |
12 | 12 | from pathlib import Path |
13 | 13 | from threading import Thread |
14 | | -from typing import Any, Dict, List, NamedTuple, Tuple |
| 14 | +from typing import Any, Dict, List, Literal, NamedTuple, Tuple |
15 | 15 |
|
16 | 16 | import graypy |
17 | 17 | import mrcfile |
18 | 18 | import numpy as np |
19 | 19 | import uvicorn |
20 | | -import workflows |
21 | 20 | from backports.entry_points_selectable import entry_points |
22 | 21 | from fastapi import Request |
23 | 22 | from fastapi.templating import Jinja2Templates |
|
42 | 41 | from sqlalchemy.orm.exc import ObjectDeletedError |
43 | 42 | from sqlmodel import Session, create_engine, select |
44 | 43 | from werkzeug.utils import secure_filename |
| 44 | +from workflows.transport.pika_transport import PikaTransport |
45 | 45 |
|
46 | 46 | import murfey |
47 | 47 | import murfey.server.ispyb |
@@ -225,6 +225,7 @@ def respond_with_template( |
225 | 225 |
|
226 | 226 |
|
227 | 227 | def run(): |
| 228 | + # Set up argument parser |
228 | 229 | parser = argparse.ArgumentParser(description="Start the Murfey server") |
229 | 230 | parser.add_argument( |
230 | 231 | "--host", |
@@ -273,28 +274,29 @@ def run(): |
273 | 274 | help="Increase logging output verbosity", |
274 | 275 | default=0, |
275 | 276 | ) |
| 277 | + # Parse and separate known and unknown args |
| 278 | + args, unknown = parser.parse_known_args() |
276 | 279 |
|
| 280 | + # Load the security configuration |
277 | 281 | security_config = get_security_config() |
278 | | - # setup logging |
| 282 | + |
| 283 | + # Set up GrayLog handler if provided in the configuration |
279 | 284 | if security_config.graylog_host: |
280 | 285 | handler = graypy.GELFUDPHandler( |
281 | 286 | security_config.graylog_host, security_config.graylog_port, level_names=True |
282 | 287 | ) |
283 | 288 | root_logger = logging.getLogger() |
284 | 289 | root_logger.addHandler(handler) |
285 | | - |
286 | 290 | # Install a log filter to all existing handlers. |
287 | 291 | LogFilter.install() |
288 | 292 |
|
289 | | - workflows.transport.load_configuration_file(security_config.rabbitmq_credentials) |
290 | | - |
291 | | - args = parser.parse_args() |
292 | | - |
293 | | - # Set up Zocalo connection |
294 | 293 | if args.demo: |
| 294 | + # Run in demo mode with no connections set up |
295 | 295 | os.environ["MURFEY_DEMO"] = "1" |
296 | 296 | else: |
297 | | - _set_up_transport(args.transport) |
| 297 | + # Load RabbitMQ configuration and set up the connection |
| 298 | + PikaTransport().load_configuration_file(security_config.rabbitmq_credentials) |
| 299 | + _set_up_transport("PikaTransport") |
298 | 300 |
|
299 | 301 | # Set up logging now that the desired verbosity is known |
300 | 302 | _set_up_logging(quiet=args.quiet, verbosity=args.verbose) |
@@ -393,7 +395,7 @@ def _set_up_logging(quiet: bool, verbosity: int): |
393 | 395 | logging.getLogger(logger_name).setLevel(log_level) |
394 | 396 |
|
395 | 397 |
|
396 | | -def _set_up_transport(transport_type): |
| 398 | +def _set_up_transport(transport_type: Literal["PikaTransport"]): |
397 | 399 | global _transport_object |
398 | 400 | _transport_object = TransportManager(transport_type) |
399 | 401 |
|
@@ -2471,19 +2473,16 @@ def _save_bfactor(message: dict, _db=murfey_db, demo: bool = False): |
2471 | 2473 | _transport_object.send( |
2472 | 2474 | "ispyb_connector", |
2473 | 2475 | { |
2474 | | - "parameters": { |
2475 | | - "ispyb_command": "buffer", |
2476 | | - "buffer_lookup": { |
2477 | | - "particle_classification_id": refined_class_uuid, |
2478 | | - }, |
2479 | | - "buffer_command": { |
2480 | | - "ispyb_command": "insert_particle_classification" |
2481 | | - }, |
2482 | | - "program_id": message["program_id"], |
2483 | | - "bfactor_fit_intercept": str(bfactor_fitting[1]), |
2484 | | - "bfactor_fit_linear": str(bfactor_fitting[0]), |
| 2476 | + "ispyb_command": "buffer", |
| 2477 | + "buffer_lookup": { |
| 2478 | + "particle_classification_id": refined_class_uuid, |
2485 | 2479 | }, |
2486 | | - "content": {"dummy": "dummy"}, |
| 2480 | + "buffer_command": { |
| 2481 | + "ispyb_command": "insert_particle_classification" |
| 2482 | + }, |
| 2483 | + "program_id": message["program_id"], |
| 2484 | + "bfactor_fit_intercept": str(bfactor_fitting[1]), |
| 2485 | + "bfactor_fit_linear": str(bfactor_fitting[0]), |
2487 | 2486 | }, |
2488 | 2487 | new_connection=True, |
2489 | 2488 | ) |
@@ -2639,7 +2638,9 @@ def feedback_callback(header: dict, message: dict) -> None: |
2639 | 2638 | cassetteSlot=message.get("sample"), |
2640 | 2639 | ) |
2641 | 2640 | if _transport_object: |
2642 | | - atlas_id = _transport_object.do_insert_atlas(atlas_record) |
| 2641 | + atlas_id = _transport_object.do_insert_atlas(atlas_record)[ |
| 2642 | + "return_value" |
| 2643 | + ] |
2643 | 2644 | murfey_dcg = db.DataCollectionGroup( |
2644 | 2645 | id=dcgid, |
2645 | 2646 | atlas_id=atlas_id, |
@@ -2756,7 +2757,6 @@ def feedback_callback(header: dict, message: dict) -> None: |
2756 | 2757 | elif message["register"] == "processing_job": |
2757 | 2758 | murfey_session_id = message["session_id"] |
2758 | 2759 | logger.info("registering processing job") |
2759 | | - assert isinstance(global_state["data_collection_ids"], dict) |
2760 | 2760 | dc = murfey_db.exec( |
2761 | 2761 | select(db.DataCollection, db.DataCollectionGroup) |
2762 | 2762 | .where(db.DataCollection.dcg_id == db.DataCollectionGroup.id) |
|
0 commit comments