Skip to content

Commit e4b5ccc

Browse files
bvanessentbennun
andauthored
Allow CLI overrides for runtime features (#39)
* Fixed a bug in the parsing of key=value arguments where the dictonary was overwritten if multiple CLI options were provided. Added an override arguments CLI option to allow a user to override any existing center-provided default. * Initial pass of enabling a set of arbitrary CLI override arguments for the runtime launch args. * Refactored the launch_script function to the scheduler parent class. Added helper functions to customize per scheduler. * Refactored the launch command function so that is it consolidated in the scheduling parent class. * Refactoring the function to build command line and batch script arguments. * Refactored the function to build the command string from each scheduler into the parent class. Added a CLI to provide a custom launch folder name. * Added control functions to help enable when to append the run arguments to the internal and launch commands. * Code cleanup. * Switched order of a few functions to change the order of arguments. * Ensure that the submit only args are appended to both interactive and batch launch commands. Fix some LSF bugs. * Adding a set of unit tests for the scheduler and CLI overrides. * Added the ability to remove a launch or run argument set by the system configuration by passing it with a leading tilde (~). Also corrected the unit testing for the new CLI override. * Cleanup. * Apply suggestions from code review Co-authored-by: Tal Ben-Nun <tbennun@users.noreply.github.com> * Added a function specialization for passing environment variables to each scheduler. --------- Co-authored-by: Tal Ben-Nun <tbennun@users.noreply.github.com>
1 parent 4e34b14 commit e4b5ccc

File tree

10 files changed

+516
-407
lines changed

10 files changed

+516
-407
lines changed

hpc_launcher/cli/common_args.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525

2626
class ParseKVAction(argparse.Action):
2727
def __call__(self, parser, namespace, values, option_string=None):
28-
setattr(namespace, self.dest, dict())
28+
if not getattr(namespace, self.dest):
29+
setattr(namespace, self.dest, dict())
2930
for each in values:
3031
try:
3132
key, value = each.split("=")
@@ -125,6 +126,16 @@ def setup_arguments(parser: argparse.ArgumentParser):
125126
help="Indicate if the job will primarily use a specific communication protocol and set any relevant environment variables: MPI or *CCL (NCCL, RCCL)",
126127
)
127128

129+
group.add_argument(
130+
"-x",
131+
"--xargs",
132+
dest="override_args",
133+
nargs='+',
134+
action=ParseKVAction,
135+
help="Specifies scheduler and launch arguments (note it will override any known key): --xargs k1=v1 k2=v2 \n or --xargs k1=v1 --xargs k2=v2 \n Also note that a double dash -- is needed if this is the last argument. \n Arguments with a leading tilde ~ will be removed if found",
136+
metavar="KEY1=VALUE1",
137+
)
138+
128139
# System
129140
group = parser.add_argument_group(
130141
"System",
@@ -136,7 +147,7 @@ def setup_arguments(parser: argparse.ArgumentParser):
136147
dest="system_params",
137148
nargs='+',
138149
action=ParseKVAction,
139-
help="Specifies some or all of the parameters of a system as a dictionary (note it will override any known or autodetected parameters): -p cores_per_node=<int> gpus_per_node=<int> gpu_arch=<str> mem_per_gpu=<float> numa_domains=<int> scheduler=<str>",
150+
help="Specifies some or all of the parameters of a system as a dictionary (note it will override any known or autodetected parameters): -p cores_per_node=<int> gpus_per_node=<int> gpu_arch=<str> mem_per_gpu=<float> numa_domains=<int> scheduler=<str>\n -p cores_per_node=<int> gpus_per_node=<int> \n Also note that a double dash -- is need if this is the last argument",
140151
metavar="KEY1=VALUE1",
141152
)
142153

@@ -188,6 +199,15 @@ def setup_arguments(parser: argparse.ArgumentParser):
188199

189200
group = parser.add_argument_group("Script", "Batch scheduler script parameters")
190201

202+
# different behavior for interactive vs batch jobs
203+
# Add an argument to pick the run directory: tmp, none, self labeled, auto labeled
204+
205+
group.add_argument(
206+
"--launch-dir-name",
207+
default=None,
208+
help="Use a custome name for the launch directory",
209+
)
210+
191211
group.add_argument(
192212
"--run-from-launch-dir",
193213
action="store_true",
@@ -293,6 +313,10 @@ def validate_arguments(args: argparse.Namespace):
293313
raise ValueError(
294314
"The --work-dir and --run-from-launch-dir flags are mutually " "exclusive"
295315
)
316+
if args.launch_dir_name and args.no_launch_dir:
317+
raise ValueError(
318+
"The --launch-dir-name and --no-launch-dir flags are mutually " "exclusive"
319+
)
296320

297321

298322
# See if the system can be autodetected and then process some special arguments

hpc_launcher/cli/launch.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def main():
4646
scheduler = launch_helpers.select_scheduler(args, logger, system)
4747

4848
_, folder_name = scheduler.create_launch_folder_name(
49-
args.command, "launch", args.no_launch_dir
49+
args.command, "launch", args.no_launch_dir, args.launch_dir_name
5050
)
5151

5252
script_file = scheduler.create_launch_folder(
@@ -59,6 +59,7 @@ def main():
5959
script_file,
6060
args.command,
6161
args.args,
62+
args.override_args,
6263
not args.bg,
6364
args.setup_only,
6465
args.color_stderr,

hpc_launcher/cli/torchrun_hpc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def main():
115115
exit(1)
116116

117117
_, folder_name = scheduler.create_launch_folder_name(
118-
args.command, "torchrun_hpc", args.no_launch_dir
118+
args.command, "torchrun_hpc", args.no_launch_dir, args.launch_dir_name
119119
)
120120

121121
script_file = scheduler.create_launch_folder(
@@ -148,6 +148,7 @@ def main():
148148
script_file,
149149
command,
150150
launch_args,
151+
args.override_args,
151152
not args.bg,
152153
# args.output_script,
153154
args.setup_only,

hpc_launcher/schedulers/flux.py

Lines changed: 36 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -33,175 +33,84 @@
3333
@dataclass
3434
class FluxScheduler(Scheduler):
3535

36-
def select_interactive_or_batch(
37-
self,
38-
tmp: list[str],
39-
header: StringIO,
40-
cmd_args: list[str],
41-
blocking: bool = True,
42-
) -> None:
43-
if blocking:
44-
cmd_args += tmp
45-
else:
46-
header.write(f'# FLUX: {" ".join(tmp)}\n')
47-
return
48-
49-
def build_command_string_and_batch_script(
36+
def build_scheduler_specific_arguments(
5037
self, system: "System", blocking: bool = True
51-
) -> (str, list[str]):
52-
53-
env_vars = system.environment_variables()
54-
passthrough_env_vars = system.passthrough_environment_variables()
55-
# Enable the system to apply some customization to the scheduler instance
56-
system.customize_scheduler(self)
57-
58-
header = StringIO()
59-
header.write("#!/bin/sh\n")
60-
cmd_args = []
38+
):
6139
if self.out_log_file and not blocking:
62-
header.write(f"# FLUX: --output={self.out_log_file}\n")
40+
self.submit_only_args[f"--output"] = f"{self.out_log_file}"
6341
if self.err_log_file and not blocking:
64-
header.write(f"# FLUX: --error={self.err_log_file}\n")
65-
66-
# Unbuffered output
67-
tmp = "-u"
68-
cmd_args += [tmp]
69-
if not blocking:
70-
header.write(f"# FLUX: {tmp}\n")
42+
self.submit_only_args[f"--error"] = f"{self.err_log_file}"
7143

7244
# Number of Nodes
73-
tmp = f"-N{self.nodes}"
74-
cmd_args += [tmp]
75-
if not blocking:
76-
header.write(f"# FLUX: {tmp}\n")
45+
self.common_launch_args[f"-N{self.nodes}"] = None
7746

7847
# Total number of Tasks / Processes
79-
tmp = f"-n{self.nodes * self.procs_per_node}"
80-
cmd_args += [tmp]
81-
if not blocking:
82-
header.write(f"# FLUX: {tmp}\n")
48+
self.common_launch_args[f"-n{self.nodes * self.procs_per_node}"] = None
49+
50+
# Unbuffered output
51+
self.common_launch_args["-u"] = None
8352

8453
# Set the Number of GPUs per task
8554
# There is a difference in option names between tasks and allocations
8655
if self.gpus_per_proc > 0:
8756
tmp = f"{self.gpus_per_proc}"
8857
# command line flag for a task
89-
self.run_launch_args["--gpus-per-task"] = tmp
58+
self.run_only_args["--gpus-per-task"] = tmp
9059
# command and shell flags for an allocation
91-
self.batch_submit_args["--gpus-per-slot"] = tmp
92-
self.batch_script_header["# FLUX: --gpus-per-slot"] = tmp
60+
if not blocking:
61+
self.submit_only_args["--gpus-per-slot"] = tmp
9362

9463
if self.work_dir:
95-
tmp = [f"--setattr=system.cwd={os.path.abspath(self.work_dir)}"]
96-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
64+
self.submit_only_args["--setattr=system.cwd"] = f"{os.path.abspath(self.work_dir)}"
9765

98-
tmp = ["-onosetpgrp"]
99-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
66+
self.common_launch_args["-onosetpgrp"] = None
10067

10168
if self.ld_preloads:
102-
tmp = [f'--env=LD_PRELOAD={",".join(self.ld_preloads)}']
103-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
69+
self.common_launch_args['--env=LD_PRELOAD'] = f'{",".join(self.ld_preloads)}'
10470

10571
if self.time_limit is not None:
106-
tmp = [f"--time={self.time_limit}m"]
107-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
72+
self.common_launch_args["--time"] = f"{self.time_limit}m"
10873

10974
if self.job_name:
110-
tmp = [f"--job-name={self.job_name}"]
111-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
75+
self.common_launch_args["--job-name"] = f"{self.job_name}"
11276

11377
if self.queue:
11478
if os.getenv("FLUX_URI"):
11579
logger.warning(
11680
f"WARNING: Dropping unsupported option requested when running inside of an allocation: --queue={self.queue}"
11781
)
11882
else:
119-
tmp = [f"--queue={self.queue}"]
120-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
83+
self.submit_only_args["--queue"] = f"{self.queue}"
12184

12285
if self.account:
123-
tmp = [f"--account={self.account}"]
124-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
86+
self.submit_only_args["--account"] = f"{self.account}"
12587

12688
if self.reservation:
12789
logger.warning(
12890
f"WARNING: Unsupported option requested: --reservation={self.reservation}"
12991
)
13092

131-
if self.launcher_flags:
132-
for flag in self.launcher_flags:
133-
# These flag should only be on the launcher commands not the batch commands
134-
cmd_args += [flag]
93+
return
13594

136-
if not blocking: # Only add batch script header items on non-blocking calls
137-
for k,v in self.batch_script_header.items():
138-
header.write(f"{k}={v}\n")
95+
def batch_script_prefix(self) -> str:
96+
return "# FLUX:"
13997

140-
for e in env_vars:
141-
header.write(parse_env_list(*e))
98+
def blocking_launch_command(self) -> list[str]:
99+
return ["flux", "run"]
142100

101+
def nonblocking_launch_command(self) -> list[str]:
102+
return ["flux", "batch"]
103+
104+
def cli_passthrough_env_arg(self, passthrough_env_vars) -> None:
143105
for k, v in passthrough_env_vars:
144-
if not blocking:
145-
cmd_args += [f" --env={k}={v}"]
146-
else:
147-
header += f"export {k}={v}\n"
148-
149-
return (header.getvalue(), cmd_args)
150-
151-
def launch_command(self, system: "System", blocking: bool = True) -> list[str]:
152-
# Launch command only use the cmd_args to construct the shell script to be launched
153-
(header_lines, cmd_args) = self.build_command_string_and_batch_script(
154-
system, blocking
155-
)
156-
157-
if not blocking:
158-
for k,v in self.batch_submit_args.items():
159-
cmd_args += [f"{k}={v}"]
160-
return ["flux", "batch"] + cmd_args
161-
162-
for k,v in self.run_launch_args.items():
163-
cmd_args += [f"{k}={v}"]
164-
return ["flux", "run"] + cmd_args
165-
166-
def launcher_script(
167-
self,
168-
system: "System",
169-
command: str,
170-
args: Optional[list[str]] = None,
171-
blocking: bool = True,
172-
save_hostlist: bool = False,
173-
launch_dir: str = "",
174-
) -> str:
175-
176-
script = ""
177-
# Launcher script only use the header_lines to construct the shell script to be launched
178-
(header_lines, cmd_string) = self.build_command_string_and_batch_script(
179-
system, blocking
180-
)
181-
for k,v in self.run_launch_args.items():
182-
cmd_string += [f"{k}={v}"]
183-
184-
script += header_lines
185-
script += "\n"
186-
if save_hostlist:
187-
script += "export HPC_LAUNCHER_HOSTLIST=$(flux hostlist local)\n"
188-
script += 'if [ "${RANK}" = "0" ]; then\n'
189-
script += " echo ${HPC_LAUNCHER_HOSTLIST} > " + os.path.join(launch_dir, f"hpc_launcher_hostlist.txt\n")
190-
script += "fi\n\n"
191-
192-
if not blocking:
193-
script += "flux run "
194-
script += " ".join(cmd_string)
195-
script += " "
196-
197-
script += f"{command}"
198-
199-
for arg in args:
200-
script += f" {arg}"
201-
202-
script += "\n"
203-
204-
return script
106+
self.submit_only_args[f"--env={k}"] = f"{v}"
107+
return
108+
109+
def export_hostlist(self) -> str:
110+
return "export HPC_LAUNCHER_HOSTLIST=$(flux hostlist local)\n"
111+
112+
def internal_script_run_command(self) -> str:
113+
return "flux run "
205114

206115
def get_job_id(self, output: str) -> Optional[str]:
207116
# The job ID is the only printout when calling flux batch

0 commit comments

Comments
 (0)