|
5 | 5 | import math |
6 | 6 | import os |
7 | 7 | import subprocess |
| 8 | +import sys |
8 | 9 | import time |
9 | 10 | from datetime import datetime |
10 | 11 | from functools import partial, singledispatch |
|
51 | 52 | get_microscope, |
52 | 53 | get_security_config, |
53 | 54 | ) |
54 | | -from murfey.workflows.clem.register_results import ( |
55 | | - register_lif_preprocessing_result, |
56 | | - register_tiff_preprocessing_result, |
57 | | -) |
58 | 55 |
|
59 | 56 | try: |
60 | 57 | from murfey.server.ispyb import TransportManager # Session |
|
65 | 62 | from murfey.util.spa_params import default_spa_parameters |
66 | 63 | from murfey.util.state import global_state |
67 | 64 |
|
| 65 | +# Import entry_points depending on Python version |
| 66 | +if sys.version_info < (3, 10): |
| 67 | + from importlib_metadata import EntryPoint, entry_points |
| 68 | +else: |
| 69 | + from importlib.metadata import EntryPoint, entry_points |
| 70 | + |
68 | 71 | try: |
69 | 72 | from importlib.resources import files # type: ignore |
70 | 73 | except ImportError: |
@@ -2968,15 +2971,28 @@ def feedback_callback(header: dict, message: dict) -> None: |
2968 | 2971 | if _transport_object: |
2969 | 2972 | _transport_object.transport.ack(header) |
2970 | 2973 | return None |
2971 | | - elif message["register"] == "register_lif_preprocessing_result": |
2972 | | - register_lif_preprocessing_result(message, murfey_db) |
2973 | | - if _transport_object: |
2974 | | - _transport_object.transport.ack(header) |
2975 | | - return None |
2976 | | - elif message["register"] == "register_tiff_preprocessing_result": |
2977 | | - register_tiff_preprocessing_result(message, murfey_db) |
2978 | | - if _transport_object: |
2979 | | - _transport_object.transport.ack(header) |
| 2974 | + elif message["register"] in ( |
| 2975 | + "register_lif_preprocessing_result", |
| 2976 | + "register_tiff_preprocessing_result", |
| 2977 | + ): |
| 2978 | + murfey_workflows = list( |
| 2979 | + entry_points().select( |
| 2980 | + group="murfey.workflows.clem", name=message["register"] |
| 2981 | + ) |
| 2982 | + ) |
| 2983 | + # Run the workflow if a match is found |
| 2984 | + if len(murfey_workflows) > 0: |
| 2985 | + workflow: EntryPoint = murfey_workflows[0] |
| 2986 | + workflow.load()( |
| 2987 | + message=message, |
| 2988 | + db=murfey_db, |
| 2989 | + ) |
| 2990 | + if _transport_object: |
| 2991 | + _transport_object.transport.ack(header) |
| 2992 | + # Nack message if no workflow found for message |
| 2993 | + else: |
| 2994 | + if _transport_object: |
| 2995 | + _transport_object.transport.nack(header) |
2980 | 2996 | return None |
2981 | 2997 | if _transport_object: |
2982 | 2998 | _transport_object.transport.nack(header, requeue=False) |
|
0 commit comments