Skip to content

Commit f6a9548

Browse files
authored
Merge pull request #6822 from grondo/add-brokers
add method to request extra brokers on node 0
2 parents 4755f91 + 93d2de0 commit f6a9548

File tree

4 files changed

+161
-98
lines changed

4 files changed

+161
-98
lines changed

src/bindings/python/flux/cli/alloc.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,9 @@
1919
from flux.uri import JobURI
2020

2121

22-
class AllocCmd(base.MiniCmd):
22+
class AllocCmd(base.BatchAllocCmd):
2323
def __init__(self, prog, usage=None, description=None):
24-
self.t0 = None
2524
super().__init__(prog, usage, description, exclude_io=True)
26-
base.add_batch_alloc_args(self.parser)
2725
self.parser.add_argument(
2826
"-v",
2927
"--verbose",
@@ -44,24 +42,14 @@ def __init__(self, prog, usage=None, description=None):
4442
)
4543

4644
def init_jobspec(self, args):
47-
# If number of slots not specified, then set it to node count
48-
# if set, otherwise raise an error.
49-
if not args.nslots:
50-
if not args.nodes:
51-
raise ValueError("Number of slots to allocate must be specified")
52-
args.nslots = args.nodes
53-
args.exclusive = True
45+
self.init_common(args)
5446

5547
# For --bg, do not run an rc2 (initial program) unless
5648
# the user explicitly specified COMMAND:
5749
if args.bg and not args.COMMAND:
5850
args.broker_opts = args.broker_opts or []
5951
args.broker_opts.append("-Sbroker.rc2_none=1")
6052

61-
if args.dump:
62-
args.broker_opts = args.broker_opts or []
63-
args.broker_opts.append("-Scontent.dump=" + args.dump)
64-
6553
jobspec = flux.job.JobspecV1.from_nest_command(
6654
command=args.COMMAND,
6755
num_slots=args.nslots,
@@ -73,6 +61,8 @@ def init_jobspec(self, args):
7361
conf=args.conf.config,
7462
)
7563

64+
self.update_jobspec_common(args, jobspec)
65+
7666
# For --bg, always allocate a pty, but not interactive,
7767
# since an interactive pty causes the job shell to hang around
7868
# until a pty client attaches, which may never happen.

src/bindings/python/flux/cli/base.py

Lines changed: 124 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from flux.idset import IDset
4141
from flux.job import JobspecV1, JobWatcher
4242
from flux.progress import ProgressBar
43-
from flux.util import dict_merge, set_treedict
43+
from flux.util import dict_merge, get_treedict, set_treedict
4444

4545
LOGGER = logging.getLogger("flux")
4646

@@ -496,6 +496,12 @@ def update(self, value):
496496
return self.update_file(value, extension)
497497
return self.update_named_config(value)
498498

499+
def get(self, key, default=None):
500+
"""
501+
Get a value from the current config using dotted key form
502+
"""
503+
return get_treedict(self.config or {}, key, default=default)
504+
499505

500506
class ConfAction(argparse.Action):
501507
"""Handle batch/alloc --conf option"""
@@ -547,7 +553,6 @@ class Xcmd:
547553
"flags": "--flags=",
548554
"begin_time": "--begin-time=",
549555
"signal": "--signal=",
550-
"taskmap": "--taskmap=",
551556
}
552557

553558
class Xinput:
@@ -1652,71 +1657,120 @@ def main(self, args):
16521657
self.run_and_exit()
16531658

16541659

1655-
def add_batch_alloc_args(parser):
1656-
"""
1657-
Add "batch"-specific resource allocation arguments to parser object
1658-
which deal in slots instead of tasks.
1659-
"""
1660-
parser.add_argument(
1661-
"--conf",
1662-
metavar="CONF",
1663-
default=BatchConfig(),
1664-
action=ConfAction,
1665-
help="Set configuration for a child Flux instance. CONF may be a "
1666-
+ "multiline string in JSON or TOML, a configuration key=value, a "
1667-
+ "path to a JSON or TOML file, or a configuration loaded by name "
1668-
+ "from a standard search path. This option may specified multiple "
1669-
+ "times, in which case the config is iteratively updated.",
1670-
)
1671-
parser.add_argument(
1672-
"--broker-opts",
1673-
metavar="OPTS",
1674-
default=None,
1675-
action="append",
1676-
help="Pass options to flux brokers",
1677-
)
1678-
parser.add_argument(
1679-
"--dump",
1680-
nargs="?",
1681-
const="flux-{{jobid}}-dump.tgz",
1682-
metavar="FILE",
1683-
help="Archive KVS on exit",
1684-
)
1685-
parser.add_argument(
1686-
"-n",
1687-
"--nslots",
1688-
type=int,
1689-
metavar="N",
1690-
help="Number of total resource slots requested."
1691-
+ " The size of a resource slot may be specified via the"
1692-
+ " -c, --cores-per-slot and -g, --gpus-per-slot options."
1693-
+ " The default slot size is 1 core.",
1694-
)
1695-
parser.add_argument(
1696-
"-c",
1697-
"--cores-per-slot",
1698-
type=int,
1699-
metavar="N",
1700-
default=1,
1701-
help="Number of cores to allocate per slot",
1702-
)
1703-
parser.add_argument(
1704-
"-g",
1705-
"--gpus-per-slot",
1706-
type=int,
1707-
metavar="N",
1708-
help="Number of GPUs to allocate per slot",
1709-
)
1710-
parser.add_argument(
1711-
"-N",
1712-
"--nodes",
1713-
type=int,
1714-
metavar="N",
1715-
help="Distribute allocated resource slots across N individual nodes",
1716-
)
1717-
parser.add_argument(
1718-
"-x",
1719-
"--exclusive",
1720-
action="store_true",
1721-
help="With -N, --nodes, allocate nodes exclusively",
1722-
)
1660+
class BatchAllocCmd(MiniCmd):
1661+
def __init__(self, prog, usage=None, description=None, exclude_io=True):
1662+
self.t0 = None
1663+
super().__init__(prog, usage, description, exclude_io)
1664+
self.parser.add_argument(
1665+
"--add-brokers", default=0, type=int, help=argparse.SUPPRESS
1666+
)
1667+
self.parser.add_argument(
1668+
"--conf",
1669+
metavar="CONF",
1670+
default=BatchConfig(),
1671+
action=ConfAction,
1672+
help="Set configuration for a child Flux instance. CONF may be a "
1673+
+ "multiline string in JSON or TOML, a configuration key=value, a "
1674+
+ "path to a JSON or TOML file, or a configuration loaded by name "
1675+
+ "from a standard search path. This option may specified multiple "
1676+
+ "times, in which case the config is iteratively updated.",
1677+
)
1678+
self.parser.add_argument(
1679+
"--broker-opts",
1680+
metavar="OPTS",
1681+
default=None,
1682+
action="append",
1683+
help="Pass options to flux brokers",
1684+
)
1685+
self.parser.add_argument(
1686+
"--dump",
1687+
nargs="?",
1688+
const="flux-{{jobid}}-dump.tgz",
1689+
metavar="FILE",
1690+
help="Archive KVS on exit",
1691+
)
1692+
self.parser.add_argument(
1693+
"-n",
1694+
"--nslots",
1695+
type=int,
1696+
metavar="N",
1697+
help="Number of total resource slots requested."
1698+
+ " The size of a resource slot may be specified via the"
1699+
+ " -c, --cores-per-slot and -g, --gpus-per-slot options."
1700+
+ " The default slot size is 1 core.",
1701+
)
1702+
self.parser.add_argument(
1703+
"-c",
1704+
"--cores-per-slot",
1705+
type=int,
1706+
metavar="N",
1707+
default=1,
1708+
help="Number of cores to allocate per slot",
1709+
)
1710+
self.parser.add_argument(
1711+
"-g",
1712+
"--gpus-per-slot",
1713+
type=int,
1714+
metavar="N",
1715+
help="Number of GPUs to allocate per slot",
1716+
)
1717+
self.parser.add_argument(
1718+
"-N",
1719+
"--nodes",
1720+
type=int,
1721+
metavar="N",
1722+
help="Distribute allocated resource slots across N individual nodes",
1723+
)
1724+
self.parser.add_argument(
1725+
"-x",
1726+
"--exclusive",
1727+
action="store_true",
1728+
help="With -N, --nodes, allocate nodes exclusively",
1729+
)
1730+
1731+
def init_common(self, args):
1732+
"""Common initialization code for batch/alloc"""
1733+
# If number of slots not specified, then set it to node count
1734+
# if set, otherwise raise an error.
1735+
if not args.nslots:
1736+
if not args.nodes:
1737+
raise ValueError("Number of slots to allocate must be specified")
1738+
args.nslots = args.nodes
1739+
args.exclusive = True
1740+
1741+
if args.dump:
1742+
args.broker_opts = args.broker_opts or []
1743+
args.broker_opts.append("-Scontent.dump=" + args.dump)
1744+
1745+
if args.add_brokers > 0:
1746+
if not args.nodes:
1747+
raise ValueError(
1748+
"--add-brokers may only be specified with -N, --nnodes"
1749+
)
1750+
nbrokers = args.add_brokers
1751+
nnodes = args.nodes
1752+
1753+
# Force update taskmap with extra ranks on nodeid 0:
1754+
args.taskmap = f"manual:[[0,1,{1+nbrokers},1],[1,{nnodes-1},1,1]]"
1755+
1756+
# Exclude the additional brokers via configuration. However,
1757+
# don't throw away any ranks already excluded bythe user.
1758+
# Note: raises an exception if user excluded by hostname (unlikely)
1759+
exclude = IDset(args.conf.get("resource.exclude", default="")).set(
1760+
1, nbrokers
1761+
)
1762+
args.conf.update(f'resource.exclude="{exclude}"')
1763+
1764+
def update_jobspec_common(self, args, jobspec):
1765+
"""Common jobspec update code for batch/alloc"""
1766+
# If args.add_brokers is being used, update jobspec task count
1767+
# to accurately reflect the updated task count.
1768+
if args.add_brokers > 0:
1769+
# Note: args.nodes required with add_brokers already checked above
1770+
total_tasks = args.nodes + args.add_brokers
1771+
1772+
# Overwrite task count with new total_tasks:
1773+
jobspec.tasks[0]["count"] = {"total": total_tasks}
1774+
1775+
# remove per-resource shell option which is no longer necessary:
1776+
del jobspec.attributes["system"]["shell"]["options"]["per-resource"]

src/bindings/python/flux/cli/batch.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@
2020
LOGGER = logging.getLogger("flux-batch")
2121

2222

23-
class BatchCmd(base.MiniCmd):
23+
class BatchCmd(base.BatchAllocCmd):
2424
def __init__(self, prog, usage=None, description=None):
2525
super().__init__(prog, usage, description)
2626
self.parser.add_argument(
2727
"--wrap",
2828
action="store_true",
2929
help="Wrap arguments or stdin in a /bin/sh script",
3030
)
31-
base.add_batch_alloc_args(self.parser)
3231
self.parser.add_argument(
3332
"SCRIPT",
3433
nargs=argparse.REMAINDER,
@@ -87,21 +86,11 @@ def process_script(self, args):
8786
return self.parse_directive_args(name, batchscript)
8887

8988
def init_jobspec(self, args):
89+
self.init_common(args)
90+
9091
if args.wrap:
9192
self.script = f"#!/bin/sh\n{self.script}"
9293

93-
# If number of slots not specified, then set it to node count
94-
# if set, otherwise raise an error.
95-
if not args.nslots:
96-
if not args.nodes:
97-
raise ValueError("Number of slots to allocate must be specified")
98-
args.nslots = args.nodes
99-
args.exclusive = True
100-
101-
if args.dump:
102-
args.broker_opts = args.broker_opts or []
103-
args.broker_opts.append("-Scontent.dump=" + args.dump)
104-
10594
# If job name is not explicitly set in args, use the script name
10695
# if a script was provided, else the string "batch" to
10796
# indicate the script was set on flux batch stdin.
@@ -124,6 +113,8 @@ def init_jobspec(self, args):
124113
conf=args.conf.config,
125114
)
126115

116+
self.update_jobspec_common(args, jobspec)
117+
127118
# Default output is flux-{{jobid}}.out
128119
# overridden by either --output=none or --output=kvs
129120
if not args.output:

t/t2714-python-cli-batch.t

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,4 +272,32 @@ test_expect_success 'flux batch: multiline --add-file requires name=' '
272272
test_must_fail flux batch --dry-run directives6.sh >d6.out 2>&1 &&
273273
grep "file name missing" d6.out
274274
'
275+
test_expect_success 'flux batch: hidden --add-brokers option requires -N' '
276+
test_must_fail flux batch -n4 --add-brokers=1 --dry-run --wrap true
277+
'
278+
test_expect_success 'flux batch: hidden --add-brokers option works' '
279+
cat <<-EOF >add-brokers.sh &&
280+
#!/bin/sh
281+
# flux: -N4 --output=add-brokers.out --error=add-brokers.err
282+
# flux: --conf=tbon.topo=kary:2
283+
# flux: --add-brokers=2
284+
#
285+
flux resource list -s free -no {nnodes}
286+
flux resource status -s exclude -no {ranks}
287+
flux exec -lr 1-2 flux getattr tbon.parent-endpoint \
288+
| sed s#://.*## | sort
289+
flux run -N4 true
290+
EOF
291+
cat <<-EOF >add-brokers.expected &&
292+
4
293+
1-2
294+
1: ipc
295+
2: ipc
296+
EOF
297+
id=$(flux batch add-brokers.sh) &&
298+
flux job wait-event $id clean &&
299+
test_debug "cat add-brokers.err" &&
300+
test_debug "cat add-brokers.out" &&
301+
test_cmp add-brokers.expected add-brokers.out
302+
'
275303
test_done

0 commit comments

Comments
 (0)