Skip to content

Commit 34d5276

Browse files
committed
slurm support for qat simplified fllow
Signed-off-by: Jennifer Chen <[email protected]>
1 parent 6ec8cdc commit 34d5276

File tree

7 files changed

+568
-122
lines changed

7 files changed

+568
-122
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import argparse
17+
18+
from nemo.collections.llm.modelopt import setup_trainer_and_restore_model_with_modelopt_spec
19+
20+
from modelopt.torch.export.plugins.nemo_run import _get_most_recent_ckpt
21+
from modelopt.torch.utils.plugins.megatron_mmlu import megatron_mmlu
22+
23+
24+
def parse_args():
25+
parser = argparse.ArgumentParser(
26+
description="Run MMLU evaluation with ModelOpt Megatron model. Provide either --nemo_ckpt or --ckpt_dir"
27+
)
28+
parser.add_argument("--nemo_ckpt", type=str, required=False, help="Path to NeMo checkpoint.")
29+
parser.add_argument(
30+
"--ckpt_dir",
31+
required=False,
32+
type=str,
33+
help="Checkpoint directory of 1 or more finetuned models",
34+
)
35+
parser.add_argument(
36+
"--tensor_parallelism", type=int, default=1, help="Tensor parallelism size."
37+
)
38+
parser.add_argument(
39+
"--pipeline_parallelism", type=int, default=1, help="Pipeline parallelism size."
40+
)
41+
return parser.parse_args()
42+
43+
44+
if __name__ == "__main__":
45+
args = parse_args()
46+
assert args.nemo_ckpt or args.ckpt_dir, "Provide one of either --nemo_ckpt or --ckpt_dir."
47+
ckpt_path = args.nemo_ckpt
48+
if args.ckpt_dir:
49+
ckpt_path = _get_most_recent_ckpt(args.ckpt_dir)
50+
model, trainer = setup_trainer_and_restore_model_with_modelopt_spec(
51+
ckpt_path,
52+
tensor_model_parallel_size=args.tensor_parallelism,
53+
pipeline_model_parallel_size=args.pipeline_parallelism,
54+
)
55+
tokenizer = model.tokenizer.tokenizer
56+
megatron_mmlu(model.module, tokenizer)
File renamed without changes.
File renamed without changes.
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import argparse
17+
import json
18+
import os
19+
from pathlib import Path
20+
21+
from datasets import load_dataset
22+
23+
24+
def get_parser():
25+
parser = argparse.ArgumentParser(description="Process nvidia/OpenScience dataset")
26+
parser.add_argument("--output-dir", type=str, default=".")
27+
return parser
28+
29+
30+
def convert_row_oai(row: dict):
31+
return {
32+
"messages": [
33+
{"role": "user", "content": row["input"]},
34+
{"role": "assistant", "content": row["output"]},
35+
]
36+
}
37+
38+
39+
def process_subset(raw_dir, proc_dir):
40+
ds = load_dataset(raw_dir)
41+
ds = ds.map(convert_row_oai, remove_columns=["input", "output"])
42+
43+
split_ds = ds["train"].train_test_split(test_size=0.1)
44+
split_ds["train"].to_json(os.path.join(proc_dir, "training.jsonl"))
45+
split_ds["test"].to_json(os.path.join(proc_dir, "validation.jsonl"))
46+
47+
48+
# TODO remove below?
49+
def sample_openscience(raw_dir, proc_dir, sample_ratio=1):
50+
"""Process raw OpenScience data by subsampling the dataset by default, then
51+
writing into train/val split with 99/1 ratio"""
52+
files = os.listdir(raw_dir)
53+
num_data = 0
54+
55+
for file in files:
56+
# Open each jsonl
57+
if file.endswith("jsonl"):
58+
print(f"Sampling from {file}")
59+
with (
60+
open(os.path.join(raw_dir, file)) as f_raw,
61+
open(os.path.join(proc_dir, "training.jsonl"), "a") as f_train,
62+
open(os.path.join(proc_dir, "validation.jsonl"), "a") as f_val,
63+
):
64+
for idx, line in enumerate(f_raw):
65+
if idx % sample_ratio != 0:
66+
continue
67+
data = json.loads(line)
68+
# convert dictionary to OpenAI chat: from {"input": "...", "output": "..."}
69+
# to [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}]
70+
data = {
71+
"messages": [
72+
{"role": "user", "content": data["input"]},
73+
{"role": "assistant", "content": data["output"]},
74+
]
75+
}
76+
77+
if num_data % 100 == 0:
78+
f_val.write(json.dumps(data) + "\n")
79+
else:
80+
f_train.write(json.dumps(data) + "\n")
81+
num_data += 1
82+
83+
84+
if __name__ == "__main__":
85+
args = get_parser().parse_args()
86+
raw_dir = f"{args.output_dir}/openscience_raw"
87+
proc_dir = f"{args.output_dir}/openscience_proc"
88+
89+
if not os.path.exists(raw_dir):
90+
# download_hf_dataset("nvidia/OpenScience", raw_dir)
91+
q235_subset = load_dataset("nvidia/OpenScience", data_files="OS-Q3-235B-4.jsonl")
92+
q235_subset.save_to_disk(raw_dir)
93+
94+
if not os.path.exists(proc_dir):
95+
Path(proc_dir).mkdir(exist_ok=True)
96+
print("Processing OpenScience dataset")
97+
process_subset(raw_dir, proc_dir)
98+
else:
99+
print(f"Processed OpenScience dataset exists in: {proc_dir}, skipped processing")
100+
# process_openscience(raw_dir, proc_dir)

examples/nemo_run/common/utils.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import subprocess
17+
from dataclasses import dataclass, field
18+
19+
import nemo_run as run
20+
from nemo.collections import llm
21+
22+
23+
@dataclass
24+
class SlurmConfig:
25+
"""Configuration for SlurmExecutor."""
26+
27+
account: str = "" # Your Slurm account
28+
partition_cpu: str = "" # Slurm CPU partition to use
29+
partition_gpu: str = "" # Slurm GPU partition to use
30+
time: str = "" # Job time limit (HH:MM:SS)
31+
container_image: str = "" # Container image for jobs
32+
env_vars: dict[str, str] = field(default_factory=dict) # Environment variables to set
33+
container_mounts: list[str] = field(default_factory=list) # Container mounts
34+
use_local_tunnel: bool = False # Set to True if running from within the cluster
35+
host: str = "" # Required for SSH tunnel: Slurm cluster hostname
36+
user: str = "" # Required for SSH tunnel: Your username
37+
job_dir: str = "" # Required for SSH tunnel: Directory to store runs on cluster
38+
identity: str | None = None # Optional for SSH tunnel: Path to SSH key for authentication
39+
40+
def __post_init__(self):
41+
"""Validate the configuration and raise descriptive errors."""
42+
if not self.account:
43+
raise ValueError("SlurmConfig.account must be set to your actual Slurm account")
44+
if not self.partition_cpu:
45+
raise ValueError("SlurmConfig.partition_cpu must be set")
46+
if not self.partition_gpu:
47+
raise ValueError("SlurmConfig.partition_gpu must be set")
48+
if not self.time:
49+
raise ValueError("SlurmConfig.time must be set to job time limit (e.g., '02:00:00')")
50+
if not self.container_image:
51+
raise ValueError("SlurmConfig.container_image must be set to container image for jobs")
52+
if not self.use_local_tunnel:
53+
# Only validate SSH tunnel settings if not using local tunnel
54+
if not self.host:
55+
raise ValueError(
56+
"SlurmConfig.host must be set to your actual cluster hostname when using SSH tunnel"
57+
)
58+
if not self.user:
59+
raise ValueError(
60+
"SlurmConfig.user must be set to your actual username when using SSH tunnel"
61+
)
62+
if not self.job_dir:
63+
raise ValueError(
64+
"SlurmConfig.job_dir must be set to directory for storing runs on cluster"
65+
)
66+
67+
self.env_vars |= {
68+
"CUDA_DEVICE_MAX_CONNECTIONS": "1", # Disable GPU communication/computation overlap for performance
69+
"TRANSFORMERS_OFFLINE": "1", # Disable online downloads from HuggingFace
70+
"TORCH_NCCL_AVOID_RECORD_STREAMS": "1", # Disable caching NCCL communication buffer memory
71+
"NCCL_NVLS_ENABLE": "0", # Disable NVLink SHARP to save memory
72+
}
73+
74+
75+
def create_slurm_executor(
76+
slurm_cfg: SlurmConfig, nodes: int = 1, ntasks_per_node: int = 1, num_gpus: int = 0
77+
):
78+
# Configure tunnel
79+
if slurm_cfg.use_local_tunnel:
80+
# Use LocalTunnel when already on the cluster
81+
tunnel = run.LocalTunnel(job_dir=slurm_cfg.job_dir)
82+
else:
83+
# Use SSH tunnel when launching from local machine
84+
tunnel = run.SSHTunnel(
85+
host=slurm_cfg.host,
86+
user=slurm_cfg.user,
87+
job_dir=slurm_cfg.job_dir,
88+
identity=slurm_cfg.identity, # can be None
89+
)
90+
91+
if num_gpus > 0:
92+
return run.SlurmExecutor(
93+
account=slurm_cfg.account,
94+
partition=slurm_cfg.partition_gpu,
95+
ntasks_per_node=ntasks_per_node,
96+
gpus_per_node=num_gpus,
97+
nodes=nodes,
98+
tunnel=tunnel,
99+
container_image=slurm_cfg.container_image,
100+
container_mounts=slurm_cfg.container_mounts,
101+
time=slurm_cfg.time,
102+
packager=run.GitArchivePackager(),
103+
mem="0",
104+
gres=f"gpu:{num_gpus}",
105+
)
106+
else:
107+
return run.SlurmExecutor(
108+
account=slurm_cfg.account,
109+
partition=slurm_cfg.partition_cpu,
110+
nodes=nodes,
111+
tunnel=tunnel,
112+
container_image=slurm_cfg.container_image,
113+
container_mounts=slurm_cfg.container_mounts,
114+
time=slurm_cfg.time,
115+
packager=run.GitArchivePackager(),
116+
mem="0",
117+
)
118+
119+
120+
def get_finetune_recipe(recipe_name: str):
121+
if not hasattr(getattr(llm, recipe_name), "finetune_recipe"):
122+
raise ValueError(f"Recipe {recipe_name} does not have a Fine-Tuning recipe")
123+
return getattr(llm, recipe_name).finetune_recipe(peft_scheme=None)
124+
125+
126+
def read_chat_template(template_path: str):
127+
with open(template_path) as f:
128+
return f.read().strip()
129+
130+
131+
def download_hf_dataset(dataset_name: str, output_dir: str | None = None):
132+
"""Download a dataset from HuggingFace Hub using huggingface-cli."""
133+
cmd = ["huggingface-cli", "download", dataset_name, "--repo-type", "dataset"]
134+
135+
if output_dir:
136+
cmd.extend(["--local-dir", output_dir])
137+
138+
subprocess.run(cmd, check=True)
139+
print(f"Successfully downloaded dataset: {dataset_name}")

0 commit comments

Comments
 (0)