Skip to content

Commit b4ff138

Browse files
committed
prometheus metrics
1 parent a368798 commit b4ff138

File tree

2 files changed

+132
-15
lines changed

2 files changed

+132
-15
lines changed

batchtools/br.py

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# pyright: reportUninitializedInstanceVariable=false
22
from typing import cast
3-
from typing_extensions import override
3+
from typing_extensions import override, Optional
44

55
import argparse
66
import os
@@ -17,6 +17,8 @@
1717
from .helpers import pretty_print
1818
from .helpers import oc_delete
1919
from .file_setup import prepare_context
20+
from .prom_metrics import render_histogram_samples, now_rfc3339, push_metrics_text
21+
from .prom_metrics import PROMETHEUS_INSTANCE
2022

2123

2224
class CreateJobCommandArgs(argparse.Namespace):
@@ -218,7 +220,27 @@ def run(args: argparse.Namespace):
218220
oc.create(job_body)
219221
print(f"Job: {job_name} created successfully. Now checking pod...")
220222
if args.wait:
221-
log_job_output(job_name=job_name, wait=True, timeout=args.timeout)
223+
# log_job_output(job_name=job_name, wait=True, timeout=args.timeout)
224+
result_phase, run_elapsed = log_job_output(
225+
job_name=job_name, wait=True, timeout=args.timeout
226+
)
227+
228+
if run_elapsed is not None:
229+
labels = {
230+
"job": job_name,
231+
"gpu": args.gpu,
232+
"queue": queue_name,
233+
"result": result_phase.lower(), # succeeded|failed (or unknown)
234+
"instance": PROMETHEUS_INSTANCE,
235+
}
236+
metrics = render_histogram_samples(
237+
metric_base="batch_duration_seconds",
238+
elapsed=run_elapsed,
239+
labels=labels,
240+
)
241+
# Include a helpful comment block and a timestamp line the scraper can ignore
242+
preface = [f"# Batch run observed at {now_rfc3339()}"]
243+
push_metrics_text("\n".join(preface) + metrics)
222244

223245
except oc.OpenShiftPythonException as e:
224246
sys.exit(f"Error occurred while creating job: {e}")
@@ -243,36 +265,51 @@ def get_pod_status(pod_name: str | None = None) -> str:
243265
return pod.model.status.phase or "Unknown"
244266

245267

246-
def log_job_output(job_name: str, *, wait: bool, timeout: int | None) -> None:
268+
def log_job_output(job_name: str, *, wait: bool, timeout: int | None) -> tuple[str, Optional[float]]:
247269
"""
248270
Wait until the job's pod completes (Succeeded/Failed), then print its logs once.
249271
"""
250272
pods = oc.selector("pod", labels={"job-name": job_name}).objects()
251273
if not pods:
252274
print(f"No pods found for job {job_name}")
253-
return
275+
return ("unknown", None)
254276

255277
pod = pods[0]
256278
pod_name = pod.model.metadata.name
257-
phase="Unknown"
258-
279+
280+
run_start = None
281+
result_phase = "unknown"
282+
run_elapsed = None
283+
259284
if wait:
260-
start = time.monotonic()
285+
start_poll = time.monotonic()
261286
while True:
262287
phase = get_pod_status(pod_name)
288+
# Mark the first time the pod is actually Running
289+
if phase == "Running" and run_start is None:
290+
run_start = time.monotonic()
291+
263292
if phase in ("Succeeded", "Failed"):
293+
result_phase = phase
264294
print(f"Pod, {pod_name} finished with phase={phase}")
265-
# end = time.monotonic()
266-
# for logging information
267-
# elapsed = end - start
268295
break
269-
if timeout and (time.monotonic() - start) > timeout:
296+
297+
if timeout and (time.monotonic() - start_poll) > timeout:
270298
print(f"Timeout waiting for pod {pod_name} to complete")
271-
print(f"Deleting pod {pod_name}")
299+
print(f"Deleting job {job_name}")
272300
oc_delete("job", job_name)
273-
return
301+
# No run elapsed available on timeout
302+
return ("timeout", None)
274303

275-
# sleep to avoid hammering the server
276304
time.sleep(2)
277-
# pass in the pod object to get logs from, not the name
305+
278306
print(pretty_print(pod))
307+
308+
# Compute elapsed RUN phase (if pod ran)
309+
if run_start is not None:
310+
run_elapsed = time.monotonic() - run_start
311+
else:
312+
# if pod never ran (e.g., failed in Pending), we can report 0 or None.
313+
run_elapsed = 0.0 if result_phase.lower() == "failed" else None
314+
315+
return (result_phase.lower(), run_elapsed)

batchtools/prom_metrics.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import os, socket, subprocess, datetime
2+
from typing import Optional
3+
from datetime import timezone
4+
from datetime import datetime
5+
6+
7+
PROMETHEUS_PUSH_URL = 'http://localhost:8080/metrics'
8+
PROMETHEUS_INSTANCE = os.environ.get("PROMETHEUS_INSTANCE", socket.gethostname())
9+
10+
DEFAULT_BUCKETS = [1, 5, 10, 30, 60, 120, 300, 600, 1200, 1800, 3600] # +Inf is implicit
11+
12+
13+
def now_rfc3339() -> str:
14+
return datetime.now(timezone.utc).isoformat()
15+
16+
def render_histogram_samples(
17+
metric_base: str,
18+
elapsed: float,
19+
labels: dict[str, str],
20+
buckets: list[float] = DEFAULT_BUCKETS,
21+
) -> str:
22+
"""
23+
Render Prometheus text exposition for a single observation into a histogram.
24+
Produces cumulative _bucket lines, plus _sum and _count.
25+
26+
Example metric_base: 'batch_duration_seconds'
27+
"""
28+
# Build label string once
29+
def lbl(extra: Optional[dict[str, str]] = None) -> str:
30+
merged = {**labels}
31+
if extra:
32+
merged.update(extra)
33+
# Ensure stable, quoted label set
34+
inner = ",".join(f'{k}="{v}"' for k, v in sorted(merged.items()))
35+
return f"{{{inner}}}"
36+
37+
# Header
38+
lines = [
39+
f"# HELP {metric_base} Runtime of batch job (seconds)",
40+
f"# TYPE {metric_base} histogram",
41+
]
42+
43+
# Cumulative bucket counts: 1 for all buckets >= elapsed else 0
44+
# Because this is a single observation.
45+
for b in buckets:
46+
val = 1 if elapsed <= b else 0
47+
lines.append(f'{metric_base}_bucket{({"le": str(b)})} {val}')
48+
# +Inf bucket is always 1 for a single observation
49+
lines.append(f'{metric_base}_bucket{lbl({"le": "+Inf"})} 1')
50+
51+
# Sum and count
52+
# Use full precision float; Prometheus parser handles standard float format
53+
lines.append(f"{metric_base}_sum{lbl()} {elapsed}")
54+
lines.append(f"{metric_base}_count{lbl()} 1")
55+
56+
return "\n".join(lines) + "\n"
57+
58+
59+
def push_metrics_text(text: str) -> None:
60+
"""
61+
POST text exposition to PROMETHEUS_PUSH_URL if set. Uses curl (as requested).
62+
"""
63+
if not PROMETHEUS_PUSH_URL:
64+
print("PROM: PROMETHEUS_PUSH_URL not set; skipping push. Below is the metrics payload:\n")
65+
print(text)
66+
return
67+
68+
try:
69+
proc = subprocess.run(
70+
["curl", "-sS", "-X", "POST", PROMETHEUS_PUSH_URL, "--data-binary", "@-",
71+
"-H", "Content-Type: text/plain; version=0.0.4"],
72+
input=text.encode("utf-8"),
73+
check=False,
74+
)
75+
if proc.returncode != 0:
76+
print(f"PROM: curl returned nonzero exit {proc.returncode}; metrics not confirmed.")
77+
else:
78+
print("PROM: metrics successfully pushed.")
79+
except Exception as e:
80+
print(f"PROM: failed to push metrics via curl: {e}")

0 commit comments

Comments
 (0)