Skip to content

Commit f9bcdbe

Browse files
authored
Merge pull request #6149 from grondo/issue#6141
cmd: support multiple queues in `flux jobs`, `pgrep`, and `pkill` `-q, --queue` option
2 parents 3dc3207 + 14c2571 commit f9bcdbe

File tree

7 files changed

+92
-42
lines changed

7 files changed

+92
-42
lines changed

doc/man1/flux-jobs.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ OPTIONS
4646

4747
List jobs with a specific job name.
4848

49-
.. option:: -q, --queue=[QUEUE]
49+
.. option:: -q, --queue=QUEUE[,...]
5050

51-
List jobs in a specific queue.
51+
List jobs in a specific queue or queues. Multiple queues may be separated
52+
by a comma or by using the :option:`-q, --queue` option multiple times.
5253

5354
.. option:: -c, --count=N
5455

doc/man1/flux-pgrep.rst

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ OPTIONS
5656
results can be listed separated by comma. See the JOB STATUS section
5757
of the :man1:`flux-jobs` manual for more detail.
5858

59-
.. option:: -q, --queue=QUEUE
59+
.. option:: -q, --queue=QUEUE[,...]
6060

61-
Only include jobs in the named queue *QUEUE*.
61+
Only include jobs in the named queue *QUEUE*. Multiple queues may be
62+
specified as a comma-separated list, or by using the :option:`--queue`
63+
option multiple times.
6264

6365
.. option:: -c, --count=N
6466

src/bindings/python/flux/job/list.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import errno
1111
import os
1212
import pwd
13+
from collections.abc import Iterable
1314

1415
import flux.constants
1516
from flux.future import WaitAllFuture
@@ -58,7 +59,11 @@ def job_list(
5859
if name:
5960
constraint["and"].append({"name": [name]})
6061
if queue:
61-
constraint["and"].append({"queue": [queue]})
62+
if isinstance(queue, str):
63+
queue = [queue]
64+
if not isinstance(queue, Iterable):
65+
raise ValueError("queue parameter must be a string or iterable")
66+
constraint["and"].append({"queue": list(queue)})
6267
if states and results:
6368
tmp = {"or": []}
6469
tmp["or"].append({"states": [states]})
@@ -201,7 +206,7 @@ class JobList:
201206
:max_entries: Maximum number of jobs to return
202207
:since: Limit jobs to those that have been active since a given timestamp.
203208
:name: Limit jobs to those with a specific name.
204-
:queue: Limit jobs to those submitted to a specific queue.
209+
:queue: Limit jobs to those submitted to a specific queue or queues
205210
"""
206211

207212
# pylint: disable=too-many-instance-attributes

src/bindings/python/flux/job/stats.py

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
# SPDX-License-Identifier: LGPL-3.0
99
###############################################################
1010

11+
import itertools
12+
1113
from flux.rpc import RPC
1214

1315

@@ -33,44 +35,80 @@ class JobStats:
3335
3436
"""
3537

38+
states = (
39+
"depend",
40+
"priority",
41+
"sched",
42+
"run",
43+
"cleanup",
44+
"inactive",
45+
"total",
46+
)
47+
stats = (
48+
"successful",
49+
"failed",
50+
"timeout",
51+
"canceled",
52+
"inactive_purged",
53+
)
54+
derived_stats = (
55+
"pending",
56+
"running",
57+
"active",
58+
)
59+
60+
class QueueStats:
61+
"""Container for a set of per-queue stats"""
62+
63+
def __init__(self, stats=None):
64+
if stats is None:
65+
self.queue_name = ""
66+
for stat in itertools.chain(JobStats.states, JobStats.stats):
67+
setattr(self, stat, 0)
68+
return
69+
70+
self.queue_name = stats["name"] if "name" in stats else "all"
71+
# Move all stats to top-level attributes of this object
72+
for state in JobStats.states:
73+
setattr(self, state, stats["job_states"][state])
74+
for stat in JobStats.stats:
75+
setattr(self, stat, stats[stat])
76+
77+
def __iadd__(self, other):
78+
self.queue_name += "," + other.queue_name
79+
for stat in itertools.chain(JobStats.states, JobStats.stats):
80+
setattr(self, stat, getattr(self, stat) + getattr(other, stat))
81+
return self
82+
3683
def __init__(self, handle, queue=None):
3784
"""Initialize a JobStats object with Flux handle ``handle``"""
3885
self.handle = handle
39-
self.queue = queue
86+
self.queues = []
87+
# Accept queue as str or iterable
88+
if queue is not None:
89+
self.queues.extend([queue] if isinstance(queue, str) else queue)
4090
self.callback = None
4191
self.cb_kwargs = {}
42-
for attr in [
43-
"depend",
44-
"priority",
45-
"sched",
46-
"run",
47-
"cleanup",
48-
"inactive",
49-
"successful",
50-
"failed",
51-
"timeout",
52-
"canceled",
53-
"inactive_purged",
54-
"pending",
55-
"running",
56-
"active",
57-
]:
92+
for attr in itertools.chain(
93+
JobStats.states, JobStats.stats, JobStats.derived_stats
94+
):
5895
setattr(self, attr, -1)
5996

6097
def _update_cb(self, rpc):
6198
resp = rpc.get()
62-
if self.queue:
63-
tmpstat = None
64-
if resp["queues"]:
65-
tmpstat = [x for x in resp["queues"] if x["name"] == self.queue]
66-
if not tmpstat:
67-
raise ValueError(f"no stats available for queue {self.queue}")
68-
resp = tmpstat[0]
69-
70-
for state, count in resp["job_states"].items():
71-
setattr(self, state, count)
72-
for state in ["successful", "failed", "timeout", "canceled", "inactive_purged"]:
73-
setattr(self, state, resp[state])
99+
queues = {x["name"]: self.QueueStats(x) for x in resp["queues"]}
100+
if self.queues:
101+
qstat = self.QueueStats()
102+
for queue in self.queues:
103+
try:
104+
qstat += queues[queue]
105+
except KeyError:
106+
raise ValueError(f"no stats available for queue {queue}")
107+
else:
108+
qstat = self.QueueStats(resp)
109+
110+
for attr in itertools.chain(JobStats.states, JobStats.stats):
111+
setattr(self, attr, getattr(qstat, attr))
74112

75113
# Compute some stats for convenience:
76114
# pylint: disable=attribute-defined-outside-init

src/cmd/flux-jobs.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,10 @@ def parse_args():
265265
parser.add_argument(
266266
"-q",
267267
"--queue",
268-
action=FilterAction,
269-
type=str,
270-
metavar="QUEUE",
271-
help="Limit output to specific queue",
268+
action=FilterActionSetUpdate,
269+
default=set(),
270+
metavar="QUEUE,...",
271+
help="Limit output to specific queue or queues",
272272
)
273273
parser.add_argument(
274274
"-o",

src/cmd/flux-pgrep.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ def parse_args():
170170
parser.add_argument(
171171
"-q",
172172
"--queue",
173-
type=str,
174-
metavar="QUEUE",
175-
help="Limit output to specific queue",
173+
type=FilterActionSetUpdate,
174+
metavar="QUEUE,...",
175+
help="Limit output to specific queue or queues",
176176
)
177177
parser.add_argument(
178178
"-c",

t/t2800-jobs-cmd.t

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,10 @@ test_expect_success 'flux-jobs --queue works' '
274274
test_debug "flux jobs -an --queue=foobar" &&
275275
test $(flux jobs -an --queue=foobar | wc -l) -eq 0
276276
'
277+
test_expect_success 'flux-jobs --queue accepts multiple queues' '
278+
test $(flux jobs -anq queue1,queue2 | wc -l) \
279+
-eq $(job_list_state_count completed sched run)
280+
'
277281

278282
# Recall pending = depend | priority | sched, running = run | cleanup,
279283
# active = pending | running

0 commit comments

Comments
 (0)