Skip to content

Commit 2273bdf

Browse files
committed
python: support multiple queues in JobStats
Problem: The JobStats class only supports a single queue. Support a list (or other iterable) for the queues parameter of the JobStats class. Move the list of stats to a set of class variables to avoid repetition. Add an internal QueueStats class to hold specific queue stats, and store any queue-specific stats in a dictionary of these objects by queue name. If multiple queues are specified in the JobStats constructor, just sum the individual queue stats together before creating any derived statistics.
1 parent 4b264f8 commit 2273bdf

File tree

1 file changed

+67
-29
lines changed

1 file changed

+67
-29
lines changed

src/bindings/python/flux/job/stats.py

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
# SPDX-License-Identifier: LGPL-3.0
99
###############################################################
1010

11+
import itertools
12+
1113
from flux.rpc import RPC
1214

1315

@@ -33,44 +35,80 @@ class JobStats:
3335
3436
"""
3537

38+
states = (
39+
"depend",
40+
"priority",
41+
"sched",
42+
"run",
43+
"cleanup",
44+
"inactive",
45+
"total",
46+
)
47+
stats = (
48+
"successful",
49+
"failed",
50+
"timeout",
51+
"canceled",
52+
"inactive_purged",
53+
)
54+
derived_stats = (
55+
"pending",
56+
"running",
57+
"active",
58+
)
59+
60+
class QueueStats:
61+
"""Container for a set of per-queue stats"""
62+
63+
def __init__(self, stats=None):
64+
if stats is None:
65+
self.queue_name = ""
66+
for stat in itertools.chain(JobStats.states, JobStats.stats):
67+
setattr(self, stat, 0)
68+
return
69+
70+
self.queue_name = stats["name"] if "name" in stats else "all"
71+
# Move all stats to top-level attributes of this object
72+
for state in JobStats.states:
73+
setattr(self, state, stats["job_states"][state])
74+
for stat in JobStats.stats:
75+
setattr(self, stat, stats[stat])
76+
77+
def __iadd__(self, other):
78+
self.queue_name += "," + other.queue_name
79+
for stat in itertools.chain(JobStats.states, JobStats.stats):
80+
setattr(self, stat, getattr(self, stat) + getattr(other, stat))
81+
return self
82+
3683
def __init__(self, handle, queue=None):
3784
"""Initialize a JobStats object with Flux handle ``handle``"""
3885
self.handle = handle
39-
self.queue = queue
86+
self.queues = []
87+
# Accept queue as str or iterable
88+
if queue is not None:
89+
self.queues.extend([queue] if isinstance(queue, str) else queue)
4090
self.callback = None
4191
self.cb_kwargs = {}
42-
for attr in [
43-
"depend",
44-
"priority",
45-
"sched",
46-
"run",
47-
"cleanup",
48-
"inactive",
49-
"successful",
50-
"failed",
51-
"timeout",
52-
"canceled",
53-
"inactive_purged",
54-
"pending",
55-
"running",
56-
"active",
57-
]:
92+
for attr in itertools.chain(
93+
JobStats.states, JobStats.stats, JobStats.derived_stats
94+
):
5895
setattr(self, attr, -1)
5996

6097
def _update_cb(self, rpc):
6198
resp = rpc.get()
62-
if self.queue:
63-
tmpstat = None
64-
if resp["queues"]:
65-
tmpstat = [x for x in resp["queues"] if x["name"] == self.queue]
66-
if not tmpstat:
67-
raise ValueError(f"no stats available for queue {self.queue}")
68-
resp = tmpstat[0]
69-
70-
for state, count in resp["job_states"].items():
71-
setattr(self, state, count)
72-
for state in ["successful", "failed", "timeout", "canceled", "inactive_purged"]:
73-
setattr(self, state, resp[state])
99+
queues = {x["name"]: self.QueueStats(x) for x in resp["queues"]}
100+
if self.queues:
101+
qstat = self.QueueStats()
102+
for queue in self.queues:
103+
try:
104+
qstat += queues[queue]
105+
except KeyError:
106+
raise ValueError(f"no stats available for queue {queue}")
107+
else:
108+
qstat = self.QueueStats(resp)
109+
110+
for attr in itertools.chain(JobStats.states, JobStats.stats):
111+
setattr(self, attr, getattr(qstat, attr))
74112

75113
# Compute some stats for convenience:
76114
# pylint: disable=attribute-defined-outside-init

0 commit comments

Comments
 (0)