Skip to content

Commit f9a669d

Browse files
committed
cmd: add flux-hostlist
Problem: It would be convenient to have a utility for working with Flux RFC 29 Hostlists -- to create a hostfile for a job or instance, or for comparing the hosts assigned to failing jobs -- but no such utility currently exists. Add flux-hostlist(1), a utility that takes one or more hostlist "sources" on the command line and supports several options to query or modify the result. Supported sources include: instance: use instance hostlist attribute local (the default): use the hosts assigned to FLUX_JOB_ID if set, o/w the hostlist attribute of the enclosing instance. avail[able]: use the hosts currently available for job scheduling. i.e. exclude drained and down hosts. stdin or '-': read hosts from stdin. jobid: hosts assigned to job or a literal host or list of hosts. The form of the result may be manipulated by one of the set operation options (union, intersect, minus, or xor), and the tool may be used to print the result (limited or modified by options), count its members, sort, etc.
1 parent a8c7ff9 commit f9a669d

File tree

2 files changed

+380
-1
lines changed

2 files changed

+380
-1
lines changed

src/cmd/Makefile.am

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ dist_fluxcmd_SCRIPTS = \
117117
flux-watch.py \
118118
flux-update.py \
119119
flux-imp-exec-helper \
120-
py-runner.py
120+
py-runner.py \
121+
flux-hostlist.py
121122

122123
fluxcmd_PROGRAMS = \
123124
flux-terminus \

src/cmd/flux-hostlist.py

Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
#############################################################
2+
# Copyright 2024 Lawrence Livermore National Security, LLC
3+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
4+
#
5+
# This file is part of the Flux resource manager framework.
6+
# For details, see https://github.com/flux-framework.
7+
#
8+
# SPDX-License-Identifier: LGPL-3.0
9+
##############################################################
10+
11+
import argparse
12+
import logging
13+
import os
14+
import sys
15+
16+
import flux
17+
import flux.util
18+
from flux.hostlist import Hostlist
19+
from flux.job import JobID, job_list_id
20+
from flux.resource import resource_status
21+
22+
LOGGER = logging.getLogger("flux-hostlist")
23+
24+
sources_description = """
25+
SOURCES may include:
26+
instance hosts from the broker 'hostlist' attribute.
27+
jobid hosts assigned to a job
28+
local hosts assigned to current job if FLUX_JOB_ID is set, otherwise
29+
returns the 'instance' hostlist
30+
avail[able] instance hostlist minus those nodes down or drained
31+
stdin, '-' read a list of hosts on stdin
32+
hosts literal list of hosts
33+
34+
The default when no SOURCES are supplied is 'local'.
35+
"""
36+
37+
38+
def parse_args():
39+
parser = argparse.ArgumentParser(
40+
prog="flux-hostlist",
41+
epilog=sources_description,
42+
formatter_class=flux.util.help_formatter(raw_description=True),
43+
)
44+
group = parser.add_mutually_exclusive_group()
45+
group.add_argument(
46+
"-e",
47+
"--expand",
48+
action="store_true",
49+
help="Expand hostlist using defined output delimiter",
50+
)
51+
parser.add_argument(
52+
"-d",
53+
"--delimiter",
54+
type=str,
55+
metavar="S",
56+
default=" ",
57+
help="Set output delimiter for expanded hostlist (default=' ')",
58+
)
59+
group.add_argument(
60+
"-c",
61+
"--count",
62+
action="store_true",
63+
help="Print the total number of hosts",
64+
)
65+
group.add_argument(
66+
"-n",
67+
"--nth",
68+
type=int,
69+
metavar="N",
70+
help="Output host at index N (-N to index from end)",
71+
)
72+
parser.add_argument(
73+
"-L",
74+
"--limit",
75+
metavar="N",
76+
type=int,
77+
help="Output at most N hosts (-N for the last N hosts)",
78+
)
79+
parser.add_argument(
80+
"-S",
81+
"--sort",
82+
action="store_true",
83+
help="Return sorted result",
84+
)
85+
parser.add_argument(
86+
"-x",
87+
"--exclude",
88+
metavar="HOSTS",
89+
type=Hostlist,
90+
help="Exclude all occurrences of HOSTS from final result",
91+
)
92+
parser.add_argument(
93+
"-u",
94+
"--union",
95+
"--unique",
96+
action="store_true",
97+
help="Return only unique hosts in the final hostlist. "
98+
+ "Without other options, this is the same as the union of all "
99+
+ "hostlist args (default mode is append).",
100+
)
101+
group2 = parser.add_mutually_exclusive_group()
102+
group2.add_argument(
103+
"-i",
104+
"--intersect",
105+
action="store_true",
106+
help="Return the intersection of all hostlists",
107+
)
108+
group2.add_argument(
109+
"-m",
110+
"--minus",
111+
action="store_true",
112+
help="Subtract all hostlist args from first argument",
113+
)
114+
group2.add_argument(
115+
"-X",
116+
"--xor",
117+
action="store_true",
118+
help="Return the symmetric difference of all hostlists",
119+
)
120+
parser.add_argument(
121+
"-f",
122+
"--fallback",
123+
action="store_true",
124+
help="Fallback to treating jobids that are not found as hostnames"
125+
+ " (for hostnames that are also valid jobids e.g. f1, fuzz100, etc)",
126+
)
127+
parser.add_argument(
128+
"-q",
129+
"--quiet",
130+
action="store_true",
131+
help="No output. Exit with nonzero exit status if hostlist is empty",
132+
)
133+
parser.add_argument(
134+
"sources",
135+
metavar="SOURCES",
136+
nargs="*",
137+
help="(optional) One or more hostlist sources",
138+
)
139+
return parser.parse_args()
140+
141+
142+
class FluxHandle:
143+
"""Singleton Flux handle"""
144+
145+
def __new__(cls):
146+
if not hasattr(cls, "handle"):
147+
cls.handle = flux.Flux()
148+
return cls.handle
149+
150+
151+
class HostlistResult:
152+
"""class representing a simple hostlist result"""
153+
154+
def __init__(self, hosts, **kwargs):
155+
self.result = Hostlist(hosts)
156+
157+
158+
class InstanceHostlistResult:
159+
"""class representing a hostlist from the instance hostlist attribute"""
160+
161+
def __init__(self, *args, **kwargs):
162+
self.result = Hostlist(FluxHandle().attr_get("hostlist"))
163+
164+
165+
class JobHostlistResult:
166+
"""class representing a job hostlist obtained from job-list"""
167+
168+
def __init__(self, jobid, **kwargs):
169+
self.arg = jobid
170+
self.jobid = JobID(jobid)
171+
self.fallback = kwargs.get("fallback", False)
172+
self.future = job_list_id(FluxHandle(), self.jobid, attrs=["nodelist"])
173+
174+
@property
175+
def result(self):
176+
try:
177+
job = self.future.get_jobinfo()
178+
except OSError as exc:
179+
if isinstance(exc, FileNotFoundError):
180+
if self.fallback:
181+
# Fall back to treating potential jobid as hostname
182+
return Hostlist(self.arg)
183+
else:
184+
raise ValueError(f"job {self.arg} not found") from None
185+
else:
186+
raise ValueError(f"job {self.arg}: {exc}") from None
187+
return Hostlist(job.nodelist)
188+
189+
190+
class AvailableHostlistResult:
191+
"""class representing available hosts in enclosing instance"""
192+
193+
def __init__(self, *args, **kwargs):
194+
"""Get local hostlist and return only available hosts"""
195+
# Store local hostlist in self.hl:
196+
self.hl = LocalHostlistResult().result
197+
# Send resource status RPCs to get available idset
198+
self.rstatus = resource_status(FluxHandle())
199+
200+
@property
201+
def result(self):
202+
# Restrict returned hostlist to only those available:
203+
avail = self.rstatus.get().avail
204+
return self.hl[avail]
205+
206+
207+
class LocalHostlistResult:
208+
"""class representing 'local' hostlist from enclosing instance or job"""
209+
210+
def __init__(self, *args, **kwargs):
211+
self.jobid_result = None
212+
if "FLUX_JOB_ID" in os.environ:
213+
# This process is running in the context of a job (not initial
214+
# program) if "FLUX_JOB_ID" is found in current environment.
215+
# Fetch hostlist via a query to job-list service:
216+
self._base = JobHostlistResult(os.environ["FLUX_JOB_ID"])
217+
else:
218+
# O/w, this is either an initial program or the enclosing instance
219+
# is the system instance. Fetch the hostlist attribuee either way
220+
self._base = InstanceHostlistResult()
221+
222+
@property
223+
def result(self):
224+
return self._base.result
225+
226+
227+
class StdinHostlistResult:
228+
"""class representing a hostlist read on stdin"""
229+
230+
def __init__(self, *args, **kwargs):
231+
hl = Hostlist()
232+
for line in sys.stdin.readlines():
233+
hl.append(line.rstrip())
234+
self.result = hl
235+
236+
237+
class HostlistResolver:
238+
"""
239+
Resolve a set of hostlist references in 'sources' into a list of hostlists.
240+
"""
241+
242+
result_types = {
243+
"instance": InstanceHostlistResult,
244+
"local": LocalHostlistResult,
245+
"stdin": StdinHostlistResult,
246+
"-": StdinHostlistResult,
247+
"avail": AvailableHostlistResult,
248+
"available": AvailableHostlistResult,
249+
}
250+
251+
def __init__(self, sources, fallback=False):
252+
self._results = []
253+
self.fallback = fallback
254+
for arg in sources:
255+
self.append(arg)
256+
257+
def append(self, arg):
258+
if arg in self.result_types:
259+
lookup = self.result_types[arg](arg, fallback=self.fallback)
260+
self._results.append(lookup)
261+
else:
262+
try:
263+
# Try argument as a jobid:
264+
result = JobHostlistResult(arg, fallback=self.fallback)
265+
self._results.append(result)
266+
except ValueError:
267+
try:
268+
# Try argument as a literal Hostlist
269+
self._results.append(HostlistResult(arg))
270+
except (TypeError, OSError, ValueError):
271+
raise ValueError(f"Invalid jobid or hostlist {arg}")
272+
273+
def results(self):
274+
return [entry.result for entry in self._results]
275+
276+
277+
def intersect(hl1, hl2):
278+
"""Set intersection of Hostlists hl1 and hl2"""
279+
result = Hostlist()
280+
for host in hl1:
281+
if host in hl2:
282+
result.append(host)
283+
result.uniq()
284+
return result
285+
286+
287+
def difference(hl1, hl2):
288+
"""Return hosts in hl1 not in hl2"""
289+
result = Hostlist()
290+
for host in hl1:
291+
if host not in hl2:
292+
result.append(host)
293+
return result
294+
295+
296+
def xor(hl1, hl2):
297+
"""Return hosts in hl1 or hl2 but not both"""
298+
result = difference(hl1, hl2)
299+
result.append(difference(hl2, hl1))
300+
result.uniq()
301+
return result
302+
303+
304+
@flux.util.CLIMain(LOGGER)
305+
def main():
306+
sys.stdout = open(
307+
sys.stdout.fileno(), "w", encoding="utf8", errors="surrogateescape"
308+
)
309+
sys.stderr = open(
310+
sys.stderr.fileno(), "w", encoding="utf8", errors="surrogateescape"
311+
)
312+
args = parse_args()
313+
314+
if not args.sources:
315+
args.sources = ["local"]
316+
317+
hostlists = HostlistResolver(args.sources, fallback=args.fallback).results()
318+
319+
hl = Hostlist()
320+
321+
if args.intersect:
322+
hl = hostlists.pop(0)
323+
for x in hostlists:
324+
hl = intersect(hl, x)
325+
elif args.xor:
326+
hl = hostlists.pop(0)
327+
for x in hostlists:
328+
hl = xor(hl, x)
329+
elif args.minus:
330+
hl = hostlists.pop(0)
331+
for x in hostlists:
332+
hl.delete(x)
333+
else:
334+
for x in hostlists:
335+
hl.append(x)
336+
337+
if args.exclude:
338+
# Delete all occurrences of args.exclude
339+
while hl.delete(args.exclude) > 0:
340+
pass
341+
342+
if args.sort:
343+
hl.sort()
344+
345+
if args.union:
346+
hl.uniq()
347+
348+
if args.limit:
349+
if args.limit > 0:
350+
hl = hl[: args.limit]
351+
else:
352+
hl = hl[args.limit :]
353+
354+
if args.quiet:
355+
sys.stdout = open(os.devnull, "w")
356+
357+
if args.nth is not None:
358+
host = hl[args.nth]
359+
if host:
360+
print(host)
361+
elif args.count:
362+
print(f"{hl.count()}")
363+
elif args.expand:
364+
# Convert '\n' specified on command line to actual newline char
365+
if hl:
366+
print(args.delimiter.replace("\\n", "\n").join(hl))
367+
else:
368+
if hl:
369+
print(hl.encode())
370+
371+
if args.quiet and not hl:
372+
sys.exit(1)
373+
374+
375+
if __name__ == "__main__":
376+
main()
377+
378+
# vi: ts=4 sw=4 expandtab

0 commit comments

Comments
 (0)