Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ Using HPC-Launcher within existing PyTorch code with explicity invoking it from
import hpc_launcher.torch
```

## CLI options for HPC-Launcher `launch` and `torchrun-hpc` commands

- [`launch`](./launch_cli.md) - General purpose HPC job launcher
- [`torchrun-hpc`](./torchrun-hpc_cli_2.md) - PyTorch-specific distributed training launcher

# LBANN: Livermore Big Artificial Neural Network Toolkit

The Livermore Big Artificial Neural Network toolkit (LBANN) is an
Expand Down
14 changes: 12 additions & 2 deletions hpc_launcher/cli/common_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,18 @@ def validate_arguments(args: argparse.Namespace):
if os.path.dirname(output_script):
raise ValueError(f"User provided output script filename cannot be a absolute or relative path: {output_script}")

if args.output_script and not args.launch_dir and not args.bg:
raise ValueError("A output script file name was provided for a ephemeral interative job.")
if args.launch_dir == None and not args.bg: # ephemeral interactive job
if args.output_script:
raise ValueError("A output script file name was provided for a ephemeral interative job.")

if args.out_log_file:
raise ValueError("A output log file name was provided for a ephemeral interative job.")

if args.err_log_file:
raise ValueError("A error log file name was provided for a ephemeral interative job.")

if args.save_hostlist:
raise ValueError("Saving the hostlist was requested for a ephemeral interative job.")

if args.output_script and args.batch_script:
raise ValueError("Cannot specify both an output script name: {args.output_script} and a pre-generated batch script {args.batch_script}.")
Expand Down
2 changes: 1 addition & 1 deletion hpc_launcher/cli/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def main():
args.setup_only,
args.color_stderr,
args.dry_run,
args.launch_dir != None and (args.save_hostlist or args.verbose),
args.launch_dir != None and args.save_hostlist,
args.batch_script != "", # If a batch script is provided don't allow it to be modified
)

Expand Down
18 changes: 6 additions & 12 deletions hpc_launcher/cli/torchrun_hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ def main():

launch_helpers.setup_logging(logger, args.verbose)

if args.fraction_max_gpu_mem and args.fraction_max_gpu_mem != 1.0:
if not args.system_params:
args.system_params = {}
args.system_params["fraction_max_gpu_mem"] = args.fraction_max_gpu_mem

# Process special arguments that can autoselect the number of ranks / GPUs
system = common_args.process_arguments(args, logger)
optimize_comm_protocol = ""
Expand Down Expand Up @@ -104,17 +109,6 @@ def main():
else:
raise Exception(f"Unknown rendezvous {args.rdv} requested.")

if args.fraction_max_gpu_mem and args.fraction_max_gpu_mem != 1.0:
env_list.append(("TORCHRUN_HPC_MAX_GPU_MEM", args.fraction_max_gpu_mem))
else:
if system.active_system_params:
env_list.append(
(
"TORCHRUN_HPC_MAX_GPU_MEM",
system.active_system_params.fraction_max_gpu_mem,
)
)

if args.unswap_rocr_hip_vis_dev:
env_list.append(("TORCHRUN_HPC_UNSWAP_ROCR_HIP_VIS_DEV", "TRUE"))

Expand Down Expand Up @@ -176,7 +170,7 @@ def main():
args.setup_only,
args.color_stderr,
args.dry_run,
args.launch_dir != None and (args.save_hostlist or args.verbose),
args.launch_dir != None and args.save_hostlist,
)

if jobid:
Expand Down
8 changes: 6 additions & 2 deletions hpc_launcher/schedulers/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def build_scheduler_specific_arguments(
self.submit_only_args["--queue"] = f"{self.queue}"

if self.account:
self.submit_only_args["--account"] = f"{self.account}"
self.submit_only_args["--bank"] = f"{self.account}"

if self.reservation:
logger.warning(
Expand Down Expand Up @@ -158,6 +158,10 @@ def num_nodes_in_allocation(cls) -> Optional[int]:

return None

@classmethod
def get_parallel_rank_env_variable(self) -> str:
return "${FLUX_TASK_RANK}"

@classmethod
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
env_vars = [
Expand Down Expand Up @@ -185,7 +189,7 @@ def get_parallel_configuration(cls) -> tuple[int, int, int, int]:

def dynamically_configure_rendezvous_protocol(self, protocol: str) -> list[str]:
env_list = []
env_list.append(("RANK", "${FLUX_TASK_RANK}"))
env_list.append(("RANK", self.get_parallel_rank_env_variable()))
if protocol.lower() == "tcp":
env_list.append(
(
Expand Down
6 changes: 5 additions & 1 deletion hpc_launcher/schedulers/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def num_nodes_in_allocation(cls) -> Optional[int]:

return None

@classmethod
def get_parallel_rank_env_variable(self) -> str:
return "${OMPI_COMM_WORLD_RANK}"

@classmethod
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
env_vars = [
Expand All @@ -167,7 +171,7 @@ def get_parallel_configuration(cls) -> tuple[int, int, int, int]:

def dynamically_configure_rendezvous_protocol(self, protocol: str) -> list[str]:
env_list = []
env_list.append(("RANK", "${OMPI_COMM_WORLD_RANK}"))
env_list.append(("RANK", self.get_parallel_rank_env_variable()))
if protocol.lower() == "tcp":
if os.getenv("LSB_HOSTS"):
# When runing under an allocation use the current node as the coordinator
Expand Down
15 changes: 15 additions & 0 deletions hpc_launcher/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,17 @@ def launcher_script(
logger.info(f"Callee directory: {callee_directory} - and {launch_dir}")
script += f"export PYTHONPATH={callee_directory}:" + "${PYTHONPATH}\n"
if save_hostlist:
script += f'export RANK={self.get_parallel_rank_env_variable()}\n'
script += self.export_hostlist()
script += 'if [ "${RANK}" = "0" ]; then\n'
script += " echo ${HPC_LAUNCHER_HOSTLIST} > " + os.path.join(launch_dir, f"hpc_launcher_hostlist.txt\n")
script += "fi\n\n"

if system.active_system_params:
system_params = system.active_system_params
if system_params.fraction_max_gpu_mem and system_params.fraction_max_gpu_mem != 1.0:
script += f'export HPC_LAUNCHER_MAX_GPU_MEM={system_params.fraction_max_gpu_mem}\n'

if self.require_parallel_internal_run_command(blocking):
script += self.internal_script_run_command()
script += " ".join(cmd_args)
Expand Down Expand Up @@ -386,6 +392,15 @@ def num_nodes_in_allocation(cls) -> tuple[int]:
"""
raise NotImplementedError

@classmethod
def get_parallel_rank_env_variable(cls) -> str:
"""
When running under an allocation, return the environment variable to get the current rank

:return: environment variable for rank in an allocation
"""
raise NotImplementedError

@classmethod
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
"""
Expand Down
8 changes: 6 additions & 2 deletions hpc_launcher/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ def num_nodes_in_allocation(cls) -> Optional[int]:

return None

@classmethod
def get_parallel_rank_env_variable(self) -> str:
return "${SLURM_PROCID}"

@classmethod
def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
# Interesting but unused variables SLURM_JOB_NUM_NODES, SLURM_NPROCS, SLURM_DISTRIBUTION
Expand All @@ -192,12 +196,12 @@ def get_parallel_configuration(cls) -> tuple[int, int, int, int]:
@classmethod
def dynamically_configure_rendezvous_protocol(self, protocol: str) -> str:
env_list = []
env_list.append(("RANK", "${SLURM_PROCID}"))
env_list.append(("RANK", self.get_parallel_rank_env_variable()))
if protocol.lower() == "tcp":
env_list.append(
(
"TORCHRUN_HPC_MASTER_ADDR",
"`printenv SLURM_JOB_NODELIST | /bin/hostlist -n 1`",
"`scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1`",
)
)
env_list.append(("TORCHRUN_HPC_MASTER_PORT", "23456"))
Expand Down
2 changes: 2 additions & 0 deletions hpc_launcher/systems/autodetect.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def find_AMD_gpus() -> (int, float, str):
finally:
try:
smi.amdsmi_shut_down()
return (0, 0, None)
except smi.AmdSmiException as e:
return (0, 0, None)

Expand Down Expand Up @@ -89,6 +90,7 @@ def find_NVIDIA_gpus() -> (int, float, str):
finally:
try:
pynvml.nvmlShutdown()
return (0, 0, None)
except pynvml.NVMLError as e:
return (0, 0, None)

Expand Down
21 changes: 15 additions & 6 deletions hpc_launcher/systems/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def configure_launch(
gpus_per_proc: Optional[int],
gpus_at_least: int = 0,
gpumem_at_least: int = 0,
cli_system_params: Optional[tuple[int, int, str, float, int, str, Optional[float]]] = None,
cli_system_params: Optional[dict[str, str]] = None,
job_comm_protocol: Optional[str] = None,
) -> tuple[System, int, int, int]:
"""
Expand Down Expand Up @@ -63,15 +63,24 @@ def configure_launch(
system_params = system.system_parameters(queue)

# If any system parameters were provided on the command line, potentially overriding any known or discovered system parameters
msg = ""
if cli_system_params:
msg = " (CLI Override) "
if not system_params: # Use a default set of system parameters
system_params = SystemParams()
_cli_system_params_dict = asdict(system_params)
# for the active system params
system.active_system_params = SystemParams()
system_params = system.active_system_params()
for field in fields(system_params):
if field.name in cli_system_params:
_cli_system_params_dict[field.name] = convert_to_type_of_another(cli_system_params[field.name], _cli_system_params_dict[field.name])
# Create a new system_params with the proper fields overwritten
system_params = SystemParams(**_cli_system_params_dict)
system_params.__dict__[field.name] = convert_to_type_of_another(cli_system_params[field.name], system_params.__dict__[field.name])
del cli_system_params[field.name]

for unused_field in cli_system_params.keys():
raise ValueError(f"System Parameters CLI attempt to overwrite unknown field: {unused_field}")

logger.info(
f"Active System Parameters{msg}: {system.active_system_params.prettyprint()}"
)

if not gpus_per_proc:
gpus_per_proc = 0
Expand Down
13 changes: 11 additions & 2 deletions hpc_launcher/systems/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ class SystemParams:
"""Simple data structure to describe an LC system."""

# Number of CPU cores per compute node
cores_per_node: int = 1
cores_per_node: int = 0
# Number of GPUs per node
gpus_per_node: int = 0
# Vendor specific GPU compiler architecture
gpu_arch: str = None
# Number of GB of memory per GPU
mem_per_gpu: float = 0.0
# Number of NUMA domains
numa_domains: int = 1
numa_domains: int = 0
# String name of the Scheduler class
scheduler: str = None
# Optional system level guard to limit GPU/APU memory utilization
Expand All @@ -61,6 +61,15 @@ def procs_per_node(self):
# Assign one rank / process to each NUMA domain to play nice with OPENMP
return self.numa_domains

def prettyprint(self):
if self.fraction_max_gpu_mem != 1.0:
effective_gpu_mem = self.fraction_max_gpu_mem * self.mem_per_gpu
max_gpu_mem = f" ({self.mem_per_gpu} GB max)"
else:
effective_gpu_mem = self.mem_per_gpu
max_gpu_mem = ""

return f"{self.scheduler} scheduled system with {self.cores_per_node} cores in {self.numa_domains} NUMA domains with {self.gpus_per_node} x {effective_gpu_mem}GB {self.gpu_arch} GPUs{max_gpu_mem}"

class System:
"""
Expand Down
2 changes: 1 addition & 1 deletion hpc_launcher/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@
import os

if torch.cuda.is_available():
fraction_max_gpu_mem = float(os.getenv("TORCHRUN_HPC_MAX_GPU_MEM"))
fraction_max_gpu_mem = float(os.getenv("HPC_LAUNCHER_MAX_GPU_MEM", 1.0))
if fraction_max_gpu_mem != 1.0:
torch.cuda.set_per_process_memory_fraction(fraction_max_gpu_mem)
2 changes: 1 addition & 1 deletion hpc_launcher/torch/torchrun_hpc_trampoline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main():
backend = None
if torch.cuda.is_available():
backend = "nccl"
fraction_max_gpu_mem = float(os.getenv("TORCHRUN_HPC_MAX_GPU_MEM"))
fraction_max_gpu_mem = float(os.getenv("HPC_LAUNCHER_MAX_GPU_MEM", 1.0))
if fraction_max_gpu_mem != 1.0 and rank == 0:
print(
f"[Rank {rank} of {world_size}] TORCHRUN-HPC set the max GPU memory fraction to {fraction_max_gpu_mem}"
Expand Down
2 changes: 1 addition & 1 deletion hpc_launcher/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

# https://stackoverflow.com/questions/14822184/is-there-a-ceiling-equivalent-of-operator-in-python
def ceildiv(a, b):
return -(a // -b)
return int(-(a // -b))
Loading