Skip to content

Commit b13c4ad

Browse files
committed
Modified workflow entry points to return a dictionary so that we can control message requeueing logic as well
1 parent 8687bba commit b13c4ad

File tree

7 files changed

+26
-24
lines changed

7 files changed

+26
-24
lines changed

src/murfey/server/feedback.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2462,17 +2462,17 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
24622462
return None
24632463
# Run the workflow if a match is found
24642464
workflow: EntryPoint = workflows[0]
2465-
result = workflow.load()(
2465+
result: dict[str, bool] = workflow.load()(
24662466
message=message,
24672467
murfey_db=_db,
24682468
)
24692469
if murfey.server._transport_object:
2470-
if result:
2470+
if result.get("success", False):
24712471
murfey.server._transport_object.transport.ack(header)
24722472
else:
24732473
# Send it directly to DLQ without trying to rerun it
24742474
murfey.server._transport_object.transport.nack(
2475-
header, requeue=False
2475+
header, requeue=result.get("requeue", False)
24762476
)
24772477
if not result:
24782478
logger.error(

src/murfey/workflows/clem/align_and_merge.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,4 @@ def submit_cluster_request(
7878
},
7979
new_connection=True,
8080
)
81-
return True
81+
return {"success": True}

src/murfey/workflows/clem/register_align_and_merge_results.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def parse_stringified_list(cls, value):
4141

4242
def register_align_and_merge_result(
4343
message: dict, murfey_db: Session, demo: bool = False
44-
) -> bool:
44+
) -> dict[str, bool]:
4545
"""
4646
session_id (recipe)
4747
register (wrapper)
@@ -69,13 +69,13 @@ def register_align_and_merge_result(
6969
"Invalid type for align-and-merge processing result: "
7070
f"{type(message['result'])}"
7171
)
72-
return False
72+
return {"success": False, "requeue": False}
7373
except Exception:
7474
logger.error(
7575
"Exception encountered when parsing align-and-merge processing result: \n"
7676
f"{traceback.format_exc()}"
7777
)
78-
return False
78+
return {"success": False, "requeue": False}
7979

8080
# Outer try-finally block for tidying up database-related section of function
8181
try:
@@ -103,8 +103,8 @@ def register_align_and_merge_result(
103103
f"{result.series_name!r}: \n"
104104
f"{traceback.format_exc()}"
105105
)
106-
return False
106+
return {"success": False, "requeue": False}
107107

108-
return True
108+
return {"success": True}
109109
finally:
110110
murfey_db.close()

src/murfey/workflows/clem/register_preprocessing_results.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class CLEMPreprocessingResult(BaseModel):
5656
extent: list[float]
5757

5858

59-
def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
59+
def run(message: dict, murfey_db: Session, demo: bool = False) -> dict[str, bool]:
6060
session_id: int = (
6161
int(message["session_id"])
6262
if not isinstance(message["session_id"], int)
@@ -72,13 +72,13 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
7272
logger.error(
7373
f"Invalid type for TIFF preprocessing result: {type(message['result'])}"
7474
)
75-
return False
75+
return {"success": False, "requeue": False}
7676
except Exception:
7777
logger.error(
7878
"Exception encountered when parsing TIFF preprocessing result: \n"
7979
f"{traceback.format_exc()}"
8080
)
81-
return False
81+
return {"success": False, "requeue": False}
8282

8383
# Outer try-finally block for tidying up database-related section of function
8484
try:
@@ -181,7 +181,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
181181
f"{result.series_name!r}: \n"
182182
f"{traceback.format_exc()}"
183183
)
184-
return False
184+
return {"success": False, "requeue": False}
185185

186186
# Load instrument name
187187
try:
@@ -197,7 +197,7 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
197197
f"Error requesting data from database for {result.series_name!r} series: \n"
198198
f"{traceback.format_exc()}"
199199
)
200-
return False
200+
return {"success": False, "requeue": False}
201201

202202
# Construct list of files to use for image alignment and merging steps
203203
image_combos_to_process = [
@@ -234,12 +234,12 @@ def run(message: dict, murfey_db: Session, demo: bool = False) -> bool:
234234
f"{result.series_name!r} series",
235235
exc_info=True,
236236
)
237-
return False
237+
return {"success": False, "requeue": False}
238238
logger.info(
239239
"Successfully requested image alignment and merging job for "
240240
f"{result.series_name!r} series"
241241
)
242-
return True
242+
return {"success": True}
243243

244244
finally:
245245
murfey_db.close()

src/murfey/workflows/notifications/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
def notification_setup(
1010
message: dict, murfey_db: Session, num_instances_between_triggers: int = 500
11-
) -> bool:
11+
) -> dict[str, bool]:
1212
parameters: Dict[str, Tuple[float, float]] = {}
1313
for k in message.keys():
1414
parameter_name = ""
@@ -48,4 +48,4 @@ def notification_setup(
4848
murfey_db.add_all(existing_notification_parameters + new_notification_parameters)
4949
murfey_db.commit()
5050
murfey_db.close()
51-
return True
51+
return {"success": True}

src/murfey/workflows/spa/flush_spa_preprocess.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,15 +306,17 @@ def _flush_position_analysis(
306306
return register_foil_hole(session_id, gs.id, foil_hole_parameters, murfey_db)
307307

308308

309-
def flush_spa_preprocess(message: dict, murfey_db: Session, demo: bool = False) -> bool:
309+
def flush_spa_preprocess(
310+
message: dict, murfey_db: Session, demo: bool = False
311+
) -> dict[str, bool]:
310312
session_id = message["session_id"]
311313
stashed_files = murfey_db.exec(
312314
select(PreprocessStash)
313315
.where(PreprocessStash.session_id == session_id)
314316
.where(PreprocessStash.tag == message["tag"])
315317
).all()
316318
if not stashed_files:
317-
return True
319+
return {"success": True}
318320

319321
murfey_session = murfey_db.exec(
320322
select(MurfeySession).where(MurfeySession.id == message["session_id"])
@@ -348,7 +350,7 @@ def flush_spa_preprocess(message: dict, murfey_db: Session, demo: bool = False)
348350
logger.warning(
349351
f"No SPA processing parameters found for client processing job ID {collected_ids[2].id}"
350352
)
351-
return False
353+
return {"success": False, "requeue": False}
352354

353355
murfey_ids = _murfey_id(
354356
collected_ids[3].id,
@@ -444,4 +446,4 @@ def flush_spa_preprocess(message: dict, murfey_db: Session, demo: bool = False)
444446
)
445447
murfey_db.commit()
446448
murfey_db.close()
447-
return True
449+
return {"success": True}

src/murfey/workflows/spa/picking.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ def _check_notifications(message: dict, murfey_db: Session) -> None:
440440
return None
441441

442442

443-
def particles_picked(message: dict, murfey_db: Session) -> bool:
443+
def particles_picked(message: dict, murfey_db: Session) -> dict[str, bool]:
444444
movie = murfey_db.exec(
445445
select(Movie).where(Movie.murfey_id == message["motion_correction_id"])
446446
).one()
@@ -460,4 +460,4 @@ def particles_picked(message: dict, murfey_db: Session) -> bool:
460460
processing_job=_pj_id(message["program_id"], murfey_db)
461461
).inc()
462462
_check_notifications(message, murfey_db)
463-
return True
463+
return {"success": True}

0 commit comments

Comments
 (0)