Skip to content

Commit 34cbab3

Browse files
bvanessentbennun
andauthored
Improve nested execution (#21)
* When running under the flux scheduler in an allocation, the --queue argument is not valid. Add a warning and guard to drop it when appropriate. * Added guards to avoid initializing torch distributed when running in a single rank launch. Also added code to detect if a launch command is inside of an existing allocation, and if so be able to report how many ranks are available within the allocation. Update the torchrun-hpc tests to ensure that if run inside of an allocation, skip the tests when there are insufficient resources. * Added support for LSF systems. * Fixed which environment variable to use for number of nodes in a slurm allocation. * Fixed bug when command_line parameter isn't set. * Changed the torchrun_hpc_stub.py file to the called torchrun_hpc_trampoline.py. Moved the file to the torch directory. The file is no longer renamed to avoid triggering spurious pytests calls. * Renamed flag run-from-dir to run-from-launch-dir. * Added the no-launch-dir flag to avoid creating a timestamped directory for launching the script. Instead it will create the launch script and log files in the current working directory. * Added a command line argument --save-hostlist to enable writing the hostlist to the file. By default the launch script will no longer do this. * Enable verbose mode to save the hostlist. * Apply suggestions from code review Co-authored-by: Tal Ben-Nun <tbennun@users.noreply.github.com> * Changed the trampoline to always check if distributed PyTorch is initialized and then destroy it at the end. --------- Co-authored-by: Tal Ben-Nun <tbennun@users.noreply.github.com>
1 parent 3488114 commit 34cbab3

File tree

12 files changed

+182
-74
lines changed

12 files changed

+182
-74
lines changed

hpc_launcher/cli/common_args.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def setup_arguments(parser: argparse.ArgumentParser):
4444
'-v',
4545
action='store_true',
4646
default=False,
47-
help='Run in verbose mode')
47+
help='Run in verbose mode. Also save the hostlist as if --save-hostlist is set')
4848

4949
# Job size arguments
5050
group = parser.add_argument_group(
@@ -134,12 +134,20 @@ def setup_arguments(parser: argparse.ArgumentParser):
134134
'Batch scheduler script parameters')
135135

136136
group.add_argument(
137-
'--run-from-dir',
137+
'--run-from-launch-dir',
138138
action='store_true',
139139
default=False,
140140
help='If set, the launcher will run the command from the timestamped '
141141
'launch directory')
142142

143+
group.add_argument(
144+
'--no-launch-dir',
145+
action='store_true',
146+
default=False,
147+
help='If set, the launcher will not create a timestamped launch directory. '
148+
'Instead, it will create the launch file and logs in the current working '
149+
'directory')
150+
143151
group.add_argument(
144152
'-o',
145153
'--output-script',
@@ -170,6 +178,11 @@ def setup_arguments(parser: argparse.ArgumentParser):
170178
help='Add a reservation arguement to scheduler. '
171179
'Typically used for Dedecated Application Time runs (DATs)')
172180

181+
group.add_argument(
182+
'--save-hostlist',
183+
action='store_true',
184+
default=False,
185+
help='Write the hostlist to a file: hpc_launcher_hostlist.txt.')
173186

174187
def validate_arguments(args: argparse.Namespace):
175188
"""
@@ -211,9 +224,9 @@ def validate_arguments(args: argparse.Namespace):
211224
if args.local and args.scheduler:
212225
raise ValueError('The --local and --scheduler flags are mutually '
213226
'exclusive')
214-
if args.work_dir and args.run_from_dir:
227+
if args.work_dir and args.run_from_launch_dir:
215228
raise ValueError(
216-
'The --work-dir and --run-from-dir flags are mutually '
229+
'The --work-dir and --run-from-launch-dir flags are mutually '
217230
'exclusive')
218231

219232
# See if the system can be autodetected and then process some special arguments

hpc_launcher/cli/launch.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,18 @@ def main():
4343
# Pick batch scheduler
4444
scheduler = launch_helpers.select_scheduler(args, logger, system)
4545

46-
_, folder_name = scheduler.create_launch_folder_name(args.command, 'launch')
46+
_, folder_name = scheduler.create_launch_folder_name(args.command, 'launch', args.no_launch_dir)
4747

4848
script_file = scheduler.create_launch_folder(folder_name,
4949
not args.bg,
5050
args.output_script,
51-
args.run_from_dir)
51+
args.run_from_launch_dir)
5252

5353
jobid = scheduler.launch(system, folder_name, script_file,
5454
args.command, args.args, not args.bg,
5555
args.setup_only,
56-
args.color_stderr, args.run_from_dir)
56+
args.color_stderr, args.run_from_launch_dir,
57+
(args.save_hostlist or args.verbose))
5758

5859
if jobid:
5960
logger.info(f'Job ID: {jobid}')

hpc_launcher/cli/torchrun_hpc.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,23 +98,24 @@ def main():
9898
)
9999
exit(1)
100100

101-
command_as_folder_name, folder_name = scheduler.create_launch_folder_name(args.command,
102-
'torchrun_hpc',)
101+
_, folder_name = scheduler.create_launch_folder_name(args.command,
102+
'torchrun_hpc',
103+
args.no_launch_dir)
103104

104105
script_file = scheduler.create_launch_folder(folder_name,
105106
not args.bg,
106107
args.output_script,
107-
args.run_from_dir)
108+
args.run_from_launch_dir)
108109

109-
stub_file = 'torchrun_hpc_' + command_as_folder_name
110+
trampoline_file = 'torchrun_hpc_trampoline.py'
110111

111112
if os.path.exists(folder_name):
112-
copied_stub_file = folder_name + '/' + stub_file
113+
copied_trampoline_file = folder_name + '/' + trampoline_file
113114
package_path = os.path.dirname(os.path.abspath(__file__))
114-
shutil.copy(os.path.join(package_path, 'torchrun_hpc_stub.py'), copied_stub_file)
115+
shutil.copy(os.path.join(package_path, '..', 'torch', trampoline_file), copied_trampoline_file)
115116

116117
command = sys.executable
117-
launch_args = ['-u', f'{os.path.abspath(folder_name)}/{stub_file}', os.path.abspath(args.command)]
118+
launch_args = ['-u', f'{os.path.abspath(folder_name)}/{trampoline_file}', os.path.abspath(args.command)]
118119
launch_args += args.args
119120

120121
logger.info(f'Running job in directory: {folder_name}')
@@ -123,7 +124,8 @@ def main():
123124
command, launch_args, not args.bg,
124125
# args.output_script,
125126
args.setup_only,
126-
args.color_stderr, args.run_from_dir)
127+
args.color_stderr, args.run_from_launch_dir,
128+
(args.save_hostlist or args.verbose))
127129

128130
if jobid:
129131
logger.info(f'Job ID: {jobid}')

hpc_launcher/schedulers/flux.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from typing import TYPE_CHECKING, Optional
1616
from io import StringIO
1717
import os
18+
import subprocess
19+
import re
1820

1921
if TYPE_CHECKING:
2022
# If type-checking, import the other class
@@ -97,8 +99,13 @@ def build_command_string_and_batch_script(self,
9799
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
98100

99101
if self.queue:
100-
tmp = [f'--queue={self.queue}']
101-
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
102+
if os.getenv('FLUX_URI'):
103+
logger.warning(
104+
f'WARNING: Dropping unsupported option requested when running inside of an allocation: --queue={self.queue}'
105+
)
106+
else:
107+
tmp = [f'--queue={self.queue}']
108+
self.select_interactive_or_batch(tmp, header, cmd_args, blocking)
102109

103110
if self.account:
104111
tmp = [f'--account={self.account}']
@@ -141,7 +148,8 @@ def launcher_script(self,
141148
system: 'System',
142149
command: str,
143150
args: Optional[list[str]] = None,
144-
blocking: bool = True) -> str:
151+
blocking: bool = True,
152+
save_hostlist: bool = False) -> str:
145153

146154
script = ''
147155
# Launcher script only use the header_lines to construct the shell script to be launched
@@ -150,7 +158,8 @@ def launcher_script(self,
150158
system, blocking)
151159
script += header_lines
152160
script += '\n'
153-
script += 'export HPC_LAUNCHER_HOSTLIST=$(flux hostlist local)\n'
161+
if save_hostlist:
162+
script += 'export HPC_LAUNCHER_HOSTLIST=$(flux hostlist local)\n'
154163

155164
if not blocking:
156165
script += 'flux run '
@@ -170,6 +179,17 @@ def get_job_id(self, output: str) -> Optional[str]:
170179
# The job ID is the only printout when calling flux batch
171180
return output.strip()
172181

182+
@classmethod
183+
def num_nodes_in_allocation(cls) -> Optional[int]:
184+
if os.getenv('FLUX_URI'):
185+
cmd = ['flux', 'resource', 'info']
186+
proc = subprocess.run(cmd, universal_newlines=True, capture_output=True)
187+
m = re.search(r'^(\d*) Nodes, (\d*) Cores, (\d*) GPUs$', proc.stdout)
188+
if m:
189+
return int(m.group(1))
190+
191+
return None
192+
173193
@classmethod
174194
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
175195
env_vars = [

hpc_launcher/schedulers/local.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def launcher_script(self,
4141
system: 'System',
4242
command: str,
4343
args: Optional[list[str]] = None,
44-
blocking: bool = True) -> str:
44+
blocking: bool = True,
45+
save_hostlist: bool = False) -> str:
4546
envvars = [
4647
f'export {k}={v}' for k, v in system.environment_variables()
4748
]
@@ -51,8 +52,11 @@ def launcher_script(self,
5152
]
5253
envvars += [
5354
'export RANK=0',
54-
'export HPC_LAUNCHER_HOSTLIST=$(hostname)',
5555
]
56+
if save_hostlist:
57+
envvars += [
58+
'export HPC_LAUNCHER_HOSTLIST=$(hostname)',
59+
]
5660
header = '\n'.join(envvars)
5761

5862
if self.work_dir:

hpc_launcher/schedulers/lsf.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,16 @@ def launcher_script(self,
127127
system: 'System',
128128
command: str,
129129
args: Optional[list[str]] = None,
130-
blocking: bool = True) -> str:
130+
blocking: bool = True,
131+
save_hostlist: bool = False) -> str:
131132

132133
script = ''
133134
# Launcher script only use the header_lines to construct the shell script to be launched
134135
(header_lines, cmd_string, parallel_run_args) = self.build_command_string_and_batch_script(system, blocking)
135136
script += header_lines
136137
script += '\n'
137-
script += "export HPC_LAUNCHER_HOSTLIST=$(echo $LSB_HOSTS | tr ' ' '\\n' | sort -u)\n\n"
138+
if save_hostlist:
139+
script += "export HPC_LAUNCHER_HOSTLIST=$(echo $LSB_HOSTS | tr ' ' '\\n' | sort -u)\n\n"
138140

139141
if not blocking or (blocking and not os.getenv('LSB_HOSTS')):
140142
script += 'jsrun '
@@ -153,6 +155,13 @@ def launcher_script(self,
153155
def get_job_id(self, output: str) -> Optional[str]:
154156
raise NotImplementedError
155157

158+
@classmethod
159+
def num_nodes_in_allocation(cls) -> Optional[int]:
160+
if os.getenv('LLNL_NUM_COMPUTE_NODES'):
161+
return int(os.getenv('LLNL_NUM_COMPUTE_NODES'))
162+
163+
return None
164+
156165
@classmethod
157166
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
158167
env_vars = ['OMPI_COMM_WORLD_SIZE', 'OMPI_COMM_WORLD_RANK', 'OMPI_COMM_WORLD_LOCAL_RANK', 'OMPI_COMM_WORLD_LOCAL_SIZE']

hpc_launcher/schedulers/scheduler.py

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,19 @@ def launch_command(self,
113113
def launcher_script(self,
114114
system: 'System',
115115
command: str,
116-
args: Optional[list[str]] = None) -> str:
116+
args: Optional[list[str]] = None,
117+
blocking: bool = True,
118+
save_hostlist: bool = False) -> str:
117119
"""
118120
Returns the full launcher script, which can be saved as a batch
119121
script, for the given system and launcher configuration.
120122
This script usually performs node/resource allocation and manages I/O.
121123
122124
:param system: The system to use.
125+
:param command: The command to launch
126+
:param args: Optional list of argument for the command to launch
127+
:param blocking: Launch the comamnd interactively if true, else in a batch job
128+
:params save_hostlist: Add local scripting to capture the list of hosts the command is launched on
123129
:return: A shell script as a string.
124130
"""
125131
raise NotImplementedError
@@ -149,6 +155,15 @@ def get_job_id(self, output: str) -> Optional[str]:
149155
"""
150156
return None
151157

158+
@classmethod
159+
def num_nodes_in_allocation(cls) -> tuple[int]:
160+
"""
161+
When running under an allocation, check how many nodes are available
162+
163+
:return: Number of nodes in an allocation
164+
"""
165+
raise NotImplementedError
166+
152167
@classmethod
153168
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
154169
"""
@@ -193,7 +208,8 @@ def setup_rendezvous_protocol(self, protocol: str) -> list[str]:
193208

194209
def create_launch_folder_name(self,
195210
command: str,
196-
folder_prefix: str = 'launch'
211+
folder_prefix: str = 'launch',
212+
no_launch_dir: bool = False,
197213
) -> (str, str):
198214
"""
199215
Create a folder name for the launcher based on the command.
@@ -206,26 +222,29 @@ def create_launch_folder_name(self,
206222
command_as_folder_name = os.path.basename(command).replace(' ', '_').replace(';','-')
207223
# Create a folder for the output and error logs
208224
# Timestamp is of the format YYYY-MM-DD_HHhMMmSSs
209-
folder_name = f'{folder_prefix}-{self.job_name or command_as_folder_name}_{time.strftime("%Y-%m-%d_%Hh%Mm%Ss")}'
225+
if no_launch_dir:
226+
folder_name = os.getcwd()
227+
else:
228+
folder_name = f'{folder_prefix}-{self.job_name or command_as_folder_name}_{time.strftime("%Y-%m-%d_%Hh%Mm%Ss")}'
210229
return (command_as_folder_name, folder_name)
211230

212231
def create_launch_folder(self,
213232
folder_name: str,
214233
blocking: bool = True,
215234
script_file: Optional[str] = None,
216-
run_from_dir: bool = False,
235+
run_from_launch_dir: bool = False,
217236
) -> (str, str):
218237
"""
219238
Create a folder and associated launch script if approrpiate.
220239
221240
:param folder_name: The name of the folder for containing all of the launch artifacts.
222241
:param blocking: If True, the job should run from the launch folder.
223242
:param script_file: If given, saves the output script to this file.
224-
:param run_from_dir: If True, runs the command from the launch folder.
243+
:param run_from_launch_dir: If True, runs the command from the launch folder.
225244
:return: The filename for the launch script as a string.
226245
"""
227246

228-
should_make_folder = blocking or run_from_dir
247+
should_make_folder = blocking or run_from_launch_dir
229248

230249
# Create a temporary file or a script file, if given
231250
if script_file is not None:
@@ -265,7 +284,8 @@ def launch(self,
265284
blocking: bool = True,
266285
setup_only: bool = False,
267286
color_stderr: bool = False,
268-
run_from_dir: bool = False) -> str:
287+
run_from_launch_dir: bool = False,
288+
save_hostlist: bool = False) -> str:
269289
"""
270290
Launches the given command and arguments uaing this launcher.
271291
@@ -278,13 +298,14 @@ def launch(self,
278298
and redirects/duplicates outputs to the terminal.
279299
:param setup_only: If True, only sets up the job and does not launch it.
280300
:param color_stderr: If True, colors stderr terminal outputs in red.
281-
:param run_from_dir: If True, runs the command from the launch directory.
301+
:param run_from_launch_dir: If True, runs the command from the launch directory.
302+
:params save_hostlist: Add local scripting to capture the list of hosts the command is launched on
282303
:return: The queued job ID as a string.
283304
"""
284305

285306
# If the command is run from a directory, and the command exists as a
286307
# file, use its absolute path
287-
if run_from_dir:
308+
if run_from_launch_dir:
288309
if os.path.isfile(command):
289310
command = os.path.abspath(command)
290311
# Change the working directory to the launch folder
@@ -299,13 +320,16 @@ def launch(self,
299320

300321
logger.info(f'Script filename: {filename}')
301322
with open(filename, 'w') as fp:
302-
fp.write(self.launcher_script(system, command, args, blocking))
303-
fp.write('\nif [ "${RANK}" = "0" ]; then')
304-
fp.write('\n echo ${HPC_LAUNCHER_HOSTLIST} > '
305-
+ os.path.join(os.path.dirname(filename), f'hpc_launcher_hostlist.txt\n'))
306-
fp.write('fi\n')
323+
fp.write(self.launcher_script(system, command, args, blocking, save_hostlist))
324+
if save_hostlist:
325+
fp.write('\nif [ "${RANK}" = "0" ]; then')
326+
fp.write('\n echo ${HPC_LAUNCHER_HOSTLIST} > '
327+
+ os.path.join(os.path.dirname(filename), f'hpc_launcher_hostlist.txt\n'))
328+
fp.write('fi\n')
329+
307330
fp.write(f'\n# Launch command: ' + ' '.join(full_cmdline) + '\n')
308-
fp.write(f'# User command invoked: ' + ' '.join(self.command_line) + '\n')
331+
if self.command_line:
332+
fp.write(f'# User command invoked: ' + ' '.join(self.command_line) + '\n')
309333
os.chmod(filename, 0o700)
310334

311335
if setup_only:

0 commit comments

Comments
 (0)