Skip to content

Commit e3ec1ae

Browse files
committed
Encased prometheus metric deletion calls in utility function to notify about failures
1 parent a44ec98 commit e3ec1ae

File tree

2 files changed

+84
-19
lines changed

2 files changed

+84
-19
lines changed

src/murfey/server/api/__init__.py

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
Proposal,
2323
)
2424
from PIL import Image
25+
from prometheus_client import Counter, Gauge
2526
from pydantic import BaseModel
2627
from sqlalchemy import func
2728
from sqlalchemy.exc import OperationalError
@@ -50,7 +51,7 @@
5051
from murfey.server.api.spa import _cryolo_model_path
5152
from murfey.server.gain import Camera, prepare_eer_gain, prepare_gain
5253
from murfey.server.murfey_db import murfey_db
53-
from murfey.util import secure_path
54+
from murfey.util import safe_run, secure_path
5455
from murfey.util.config import MachineConfig, from_file, settings
5556
from murfey.util.db import (
5657
AutoProcProgram,
@@ -1618,34 +1619,57 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db):
16181619
).all()
16191620
# Don't remove prometheus metrics if there are other sessions using them
16201621
if len(sessions_for_visit) == 1:
1621-
try:
1622-
prom.monitoring_switch.remove(session.visit)
1623-
except KeyError:
1624-
pass
1622+
safe_run(
1623+
prom.monitoring_switch.remove,
1624+
args=(session.visit,),
1625+
label="monitoring_switch",
1626+
)
16251627
rsync_instances = db.exec(
16261628
select(RsyncInstance).where(RsyncInstance.session_id == session_id)
16271629
).all()
16281630
for ri in rsync_instances:
1629-
try:
1630-
prom.seen_files.remove(ri.source, session.visit)
1631-
prom.transferred_files.remove(ri.source, session.visit)
1632-
prom.transferred_files_bytes.remove(ri.source, session.visit)
1633-
prom.seen_data_files.remove(ri.source, session.visit)
1634-
prom.transferred_data_files.remove(ri.source, session.visit)
1635-
prom.transferred_data_files_bytes.remove(ri.source, session.visit)
1636-
except KeyError:
1637-
pass
1631+
safe_run(
1632+
prom.seen_files.remove,
1633+
args=(ri.source, session.visit),
1634+
label="seen_files",
1635+
)
1636+
safe_run(
1637+
prom.transferred_files.remove,
1638+
args=(ri.source, session.visit),
1639+
label="transferred_files",
1640+
)
1641+
safe_run(
1642+
prom.transferred_files_bytes.remove,
1643+
args=(ri.source, session.visit),
1644+
label="transferred_files_bytes",
1645+
)
1646+
safe_run(
1647+
prom.seen_data_files.remove,
1648+
args=(ri.source, session.visit),
1649+
label="seen_data_files",
1650+
)
1651+
safe_run(
1652+
prom.transferred_data_files.remove,
1653+
args=(ri.source, session.visit),
1654+
label="transferred_data_files",
1655+
)
1656+
safe_run(
1657+
prom.transferred_data_files_bytes.remove,
1658+
args=(ri.source, session.visit),
1659+
label="transferred_data_file_bytes",
1660+
)
16381661
collected_ids = db.exec(
16391662
select(DataCollectionGroup, DataCollection, ProcessingJob)
16401663
.where(DataCollectionGroup.session_id == session_id)
16411664
.where(DataCollection.dcg_id == DataCollectionGroup.id)
16421665
.where(ProcessingJob.dc_id == DataCollection.id)
16431666
).all()
16441667
for c in collected_ids:
1645-
try:
1646-
prom.preprocessed_movies.remove(c[2].id)
1647-
except KeyError:
1648-
continue
1668+
safe_run(
1669+
prom.preprocessed_movies.remove,
1670+
args=(c[2].id,),
1671+
label="preprocessed_movies",
1672+
)
16491673
db.delete(session)
16501674
db.commit()
16511675
return
@@ -1957,3 +1981,25 @@ def update_current_gain_ref(
19571981
session.current_gain_ref = new_gain_ref.path
19581982
db.add(session)
19591983
db.commit()
1984+
1985+
1986+
@router.get("/prometheus/{metric_name}")
1987+
def inspect_prometheus_metrics(
1988+
metric_name: str,
1989+
):
1990+
# Extract the Prometheus metric defined in the Prometheus module
1991+
metric: Optional[Counter | Gauge] = getattr(prom, metric_name, None)
1992+
if metric is None or not isinstance(metric, (Counter, Gauge)):
1993+
raise LookupError("No matching metric was found")
1994+
1995+
# Print out contents
1996+
results = {}
1997+
if hasattr(metric, "_metrics"):
1998+
for i, (label_tuple, sub_metric) in enumerate(metric._metrics.items()):
1999+
labels = dict(zip(metric._labelnames, label_tuple))
2000+
labels["value"] = sub_metric._value.get()
2001+
results[i] = labels
2002+
return results
2003+
else:
2004+
value = metric._value.get()
2005+
return {"value": value}

src/murfey/util/__init__.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pathlib import Path
55
from queue import Queue
66
from threading import Thread
7-
from typing import Optional
7+
from typing import Any, Callable, Optional
88
from uuid import uuid4
99

1010
from werkzeug.utils import secure_filename
@@ -132,3 +132,22 @@ def filter(self, record: logging.LogRecord) -> bool:
132132
if "." not in logger_name:
133133
return False
134134
logger_name = logger_name.rsplit(".", maxsplit=1)[0]
135+
136+
137+
def safe_run(
138+
func: Callable,
139+
args: list | tuple = [],
140+
kwargs: dict[str, Any] = {},
141+
label: str = "",
142+
):
143+
"""
144+
A wrapper to encase individual functions in try-except blocks so that a warning
145+
is raised if the function fails, but the process continues as normal otherwise.
146+
"""
147+
try:
148+
return func(*args, **kwargs)
149+
except Exception:
150+
logger.warning(
151+
f"Function {func.__name__!r} failed to run for object {label!r}",
152+
exc_info=True,
153+
)

0 commit comments

Comments
 (0)